Kafkaからデータを読み取り、変換後にこのデータをhttpエンドポイント(または別のkafka - この質問ではhttpを考えてみましょう)に送信する単純なスパークストリーミングアプリケーションがあります。job-serverを使用してジョブを送信しています。
現在、「auto.offset.reset」="smallest" および interval=3s でソース kafka から消費を開始しています。幸せな場合、すべてが良さそうに見えます。ここに抜粋があります:
kafkaInputDStream.foreachRDD(rdd => {
rdd.foreach(item => {
//This will throw exception if http endpoint isn't reachable
httpProcessor.process(item._1, item._2)
})
})
"auto.offset.reset"="smallest" であるため、これは 1 つのジョブで約 200K のメッセージを処理します。ジョブの途中で http サーバーを停止し (POST で問題をシミュレート)、httpProcessor.process が例外をスローすると、そのジョブは失敗し、未処理のものはすべて失われます。その後、3秒ごとにポーリングし続けていることがわかります。
だから私の質問は:
- 次の 3 秒のジョブで X メッセージを受け取り、エラーが発生する前に Y のみを処理できた場合、残りの XY は処理されないという私の仮定は正しいですか?
- Kafka からのストリーム/消費を一時停止する方法はありますか? たとえば、断続的なネットワークの問題が発生し、消費されたすべてのメッセージがその間に失われる可能性が高い場合です。再試行を続けるもの (おそらく指数バックオフ) で、http エンドポイントがアップするたびに、再び消費を開始します。
ありがとう