11

私は、kafka 経由で hdfs に書き込む必要があるプロジェクトに取り組んでいます。メッセージをkafkaに書き込むオンラインサーバーがあるとします。各メッセージにはタイムスタンプが含まれています。出力がメッセージのタイムスタンプに従ってファイル/ファイルになるジョブを作成したい。たとえば、kafka のデータが

 {"ts":"01-07-2013 15:25:35.994", "data": ...}
 ...    
 {"ts":"01-07-2013 16:25:35.994", "data": ...}
 ... 
 {"ts":"01-07-2013 17:25:35.994", "data": ...}

3つのファイルを出力として取得したい

  kafka_file_2013-07-01_15.json
  kafka_file_2013-07-01_16.json
  kafka_file_2013-07-01_17.json 

そしてもちろん、このジョブをもう一度実行していて、次のような新しいメッセージがキューにある場合

 {"ts":"01-07-2013 17:25:35.994", "data": ...}

ファイルを作成する必要があります

  kafka_file_2013-07-01_17_2.json // second  chunk of hour 17

私はいくつかのオープン ソースを見てきましたが、それらのほとんどは kafka からいくつかの hdfs フォルダーを読み取ります。この問題に対する最善の解決策/設計/オープンソースは何ですか

4

5 に答える 5

7

あなたは間違いなくCamus APIlinkedInからの実装をチェックアウトする必要があります. Camus は、LinkedIn の Kafka->HDFS パイプラインです。これは、Kafka から分散データをロードする mapreduce ジョブです。Twitter ストリームからフェッチし、ツイートのタイムスタンプに基づいて HDFS に書き込む簡単な例については、私が書いたこの投稿を確認してください。

プロジェクトは、 https://github.com/linkedin/camusの github で入手できます。

Camus には、Kafka からのデータの読み取りとデコード、および HDFS へのデータの書き込みのために、2 つの主要なコンポーネントが必要です –</p>

Kafka から読み取ったメッセージのデコード

Camus には、Kafka からのメッセージのデコードに役立つ一連の Decoders があります。Decoders は基本的に拡張com.linkedin.camus.coders.MessageDecoderされ、タイムスタンプに基づいてデータを分割するロジックを実装します。このディレクトリには一連の事前定義された Decoder があり、これらに基づいて独自の Decoder を作成できます。 camus/camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/

HDFS へのメッセージの書き込み

Camus は、Camus に HDFS に書き込む必要のあるペイロードを伝える拡張 RecordWriterProvider クラスのセットを必要com.linkedin.camus.etl.RecordWriterProviderとします。事前定義された RecordWriterProvider のセットはこのディレクトリに存在し、これらに基づいて独自に作成できます。

camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common
于 2015-02-19T05:50:14.583 に答える
2

よりリアルタイムなアプローチを探している場合は、StreamSets Data Collectorをチェックしてください。これは、取り込み用の Apache ライセンスのオープン ソース ツールでもあります。

HDFS 宛先は、指定したテンプレートに基づいて時間ベースのディレクトリに書き込むように構成できます。また、受信メッセージのフィールドを指定して、メッセージを書き込む時間を決定する方法も既に含まれています。構成は「時間基準」と呼ばれ、次のようなものを指定できます${record:value("/ts")}

*完全な開示私はこのツールに取り組んでいるエンジニアです。

于 2015-11-11T00:10:29.800 に答える
1

Kafka から HDFS への継続的な取り込みについては、こちらをご覧ください。Apache Apexに依存しているため、Apex が提供する保証があります。

https://www.datatorrent.com/apphub/kafka-to-hdfs-sync/

于 2016-11-15T01:28:39.360 に答える
0

カミュをチェックアウト: https://github.com/linkedin/camus

ただし、これは Avro 形式でデータを書き込みます... 他の RecordWriters はプラグイン可能です。

于 2013-07-10T00:09:11.513 に答える