4

Kafkaからデータを読み取り、変換後にこのデータをhttpエンドポイント(または別のkafka - この質問ではhttpを考えてみましょう)に送信する単純なスパークストリーミングアプリケーションがあります。job-serverを使用してジョブを送信しています。

現在、「auto.offset.reset」="smallest" および interval=3s でソース kafka から消費を開始しています。幸せな場合、すべてが良さそうに見えます。ここに抜粋があります:

kafkaInputDStream.foreachRDD(rdd => {
  rdd.foreach(item => {
  //This will throw exception if http endpoint isn't reachable
      httpProcessor.process(item._1, item._2)
  })
})

"auto.offset.reset"="smallest" であるため、これは 1 つのジョブで約 200K のメッセージを処理します。ジョブの途中で http サーバーを停止し (POST で問題をシミュレート)、httpProcessor.process が例外をスローすると、そのジョブは失敗し、未処理のものはすべて失われます。その後、3秒ごとにポーリングし続けていることがわかります。

だから私の質問は:

  1. 次の 3 秒のジョブで X メッセージを受け取り、エラーが発生する前に Y のみを処理できた場合、残りの XY は処理されないという私の仮定は正しいですか?
  2. Kafka からのストリーム/消費を一時停止する方法はありますか? たとえば、断続的なネットワークの問題が発生し、消費されたすべてのメッセージがその間に失われる可能性が高い場合です。再試行を続けるもの (おそらく指数バックオフ) で、http エンドポイントがアップするたびに、再び消費を開始します。

ありがとう

4

2 に答える 2