14

Kafka 0.8.2AdExchange からデータを受信するために使用Spark Streaming 1.4.1してから、データを に保存するために使用していMongoDBます。

私の問題は、Spark Streamingたとえば、新しいバージョンの更新、バグの修正、新しい機能の追加など、ジョブを再起動するときです。その時点で最新offsetのものを引き続き読み取り、ジョブの再起動中に AdX が kafka にプッシュしたデータを失います。kafka

私は何かを試してみますauto.offset.reset -> smallestが、0から受信します->最後に、データが巨大でdbで重複していました。

また、特定のものを設定しようとgroup.idconsumer.idますSparkが、同じです。

offset消費された最新のスパークを保存する方法、zookeeperまたはkafkaそれから最新のものに読み戻す方法はoffset?

4

4 に答える 4

15

createDirectStream 関数のコンストラクターの 1 つは、キーとしてパーティション ID を保持し、値として消費を開始するオフセットを保持するマップを取得できます。

ここで api を見てください: http://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/streaming/kafka/KafkaUtils.html 私が話していたマップは通常次のように呼ばれます: fromOffsets

マップにデータを挿入できます。

startOffsetsMap.put(TopicAndPartition(topicName,partitionId), startOffset)

そして、直接ストリームを作成するときにそれを使用します:

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
                streamingContext, kafkaParams, startOffsetsMap, messageHandler(_))

各反復の後、次を使用して処理されたオフセットを取得できます。

rdd.asInstanceOf[HasOffsetRanges].offsetRanges

このデータを使用して、次の反復で fromOffsets マップを構築できます。

完全なコードと使用法は、ページの最後にあるhttps://spark.apache.org/docs/latest/streaming-kafka-integration.htmlで確認できます。

于 2015-08-06T06:55:56.403 に答える
2

Michael Kopaniov の回答に追加すると、オフセットのマップを保存およびロードする場所として ZK を本当に使用したい場合は、可能です。

ただし、結果は ZK に出力されないため、出力操作がべき等でない限り、信頼できるセマンティクスは得られません (そうではないように思えます)。

結果を mongo の同じドキュメントにオフセットと一緒に単一のアトミック アクションで保存できる場合は、それが適している可能性があります。

詳細については、https://www.youtube.com/watch?v=fXnNEq1v3VAを参照してください。

于 2015-08-10T17:33:18.187 に答える
-1

私はまだこれを 100% 理解していませんが、最善の策はおそらく JavaStreamingContext.checkpoint() をセットアップすることです。

例については、 https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#checkpointingを参照してください。

いくつかのブログエントリによるとhttps://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.mdいくつかの注意事項がありますが、ほのめかされているだけで実際にはそうではない特定の周辺ケースが含まれているようにほとんど感じます説明した。

于 2015-08-07T12:43:59.213 に答える