3

データが生成されるとすぐに Druid に送信される場合は、(IoT のように) すべて問題ありません。大好きです。

しかし、今ではデータ入力の遅れに起因する別の状況があります。

エンドユーザーはオフラインになる (インターネット接続が失われる) 可能性があり、データは携帯電話に保存され、オンラインに戻ったときにのみ Druid に送信されます。

つまり、彼女がインターネットを回復するまでに、ドルイドに送信されたデータ (たとえば、Tranquility サーバーを介して) は、ドルイドによって拒否されます (ドルイド リアルタイムは過去のデータを受け入れないため)。

もちろん、データがサーバーに送信される時刻にタイムスタンプを設定できます。しかし、それはレポートを歪めます...、別のフィールド (たとえば、 generated_ts としましょう) を追加し、それをさらに別のディメンションとして宣言する場合を除きます。

しかし、そうなると、Druid (?) で無料で取得できる時間ベースの自動ロールアップの恩恵を受けることはできません。次のように、groupBy (その generated_ts をディメンションの 1 つとして) を使用する必要があります。

{
  "queryType": "groupBy",
  "dataSource": "my_datasource",
  "granularity": "none",
  "dimensions": [
    "city",
    {
      "type" : "extraction",
      "dimension" : "generated_ts",
      "outputName" :  "dayOfWeek",
      "extractionFn" : {
        "type" : "timeFormat",
        "format" : "EEEE"
      }
    }
  ],
  ...
}

私の質問は次のとおりです。

  1. アプローチは有効ですか?
  2. はいの場合: 罰則は何ですか? (パフォーマンスだと思いますが、どれくらい悪いですか?)

ありがとう、ラカ

--

以下のRamkumarの回答への回答、フォローアップの質問:

私はまだこのバッチ取り込みをよく理解していません:

イベント A を考えてみましょう。これはタイムスタンプ 3 で生成され、タイムスタンプ 15 までサーバーに送信されませんでした。

タイムスタンプ 15 で送信されると、値は {ts: 15, generated_ts: 3, metric1: 12, dimension1: 'a'} になります。

キーのタイムスタンプは「ts」です。

不正確です。理想は {ts: 3, generated_ts: 3, metric1: 12, dimension1: 'a'} ですが、Tranquility が受け入れるように、insert_ts として 15 を指定する必要がありました。

ここで、バッチ インジェスト中に修正したいと思います。正しい ts {ts: 3, generated_ts: 3, metric1: 12, dimension1: 'a'} になりました。

質問: その場合、イベントが重複しますか?

または...(これは私が推測する):指定された時間間隔のバッチ取り込みは、基本的にその間隔内のすべてのデータを置き換えますか?(これが事実であることを願っています。そうすれば、データの重複について心配するのをやめることができます)

追加のメモ (ちょうど): 私はこれに出くわしました: https://github.com/druid-io/tranquility/blob/master/docs/overview.md#segment-granularity-and-window-period

それは言う:

Metamarkets での私たちのアプローチは、すべてのデータを Tranquility を介してリアルタイムで送信することですが、コピーを S3 に保存し、夜間の Hadoop バッチ インデックス作成ジョブをフォローアップしてデータを再取り込みすることで、これらのリスクを軽減することもできます。これにより、最終的にすべてのイベントが Druid で正確に 1 回表現されることが保証されます。

ということは… 再摂取ということで、その意味は(推測ですが)完全な置き換えですよね?

4

1 に答える 1

2

同様の問題があり、ラムダ アーキテクチャを使用して解決しました。セットアップには 2 つのパイプラインがあります。

  1. 私たちのリアルタイム パイプラインは Kafka+Spark からフィードし、druid に取り込みます。これはリアルタイムのデータになります。ただし、ドルイドが期待する粒度よりも古いデータは拒否されます。そのため、データ到着が遅れるとデータが失われます。
  2. 私たちのバッチ パイプラインは 1 時間ごとに Hadoop にデータを取り込み、次にバッチ取り込みジョブを Druid にトリガーします。これにより、キーに記載されているタイムスタンプのセグメントが作成され、集計が行われ、古いセグメントが同じタイムスタンプに置き換えられます。実際には、druid のストレージの原則は、不変性、MVCC、およびログ構造のストレージに基づいています。そのため、同じタイムスタンプの新しいバージョンのセグメントが来ると、古いセグメントはガベージ コレクションされます。

バッチ インジェストの詳細: バッチ ジョブは、1 時間ごとのフォルダーに編成された HDFS からデータを操作します。遅れて発生したイベントは、適切な 1 時間ごとのバケットに入れられます。遅れたデータに対する SLA は XXX 時間です (優れた記事を読んだ場合はウォーターマークと呼ばれます)。そのため、現在の時間を取得して XXX を減算し、対応する時間ごとのフォルダー ファイルを取得して、その特定の時間に druid でバッチ インジェスト ジョブをトリガーします。イベントがウォーターマークの前に到着した場合、これでもデータ損失が発生する可能性があることに注意してください。 HDFS 側のストレージは非常に限られています。

于 2016-10-14T07:00:44.310 に答える