あなたの声明は完全に真実ではありません。「コミットするかどうかを制御する」ことはできません-少なくとも直接ではありません(プロセッサAPIでもDSLでも)。追加のコミットProcessorContext#commit()
を要求するためにのみ使用できます。したがって、Streams の呼び出し後、できるだけ早くコミットしようとしますが、即時のコミットではありません。さらに、 を呼び出さなくても Streams は自動的にコミットします。Streams 設定を介して Streams コミット間隔を制御できます( http://docs.confluent.io/current/streams/developer-guide.html#configuring-a-kafka-streams-applicationを参照) 。#commit()
#commit()
commit.interval.m
「問題」が発生した場合は、問題の種類によって対応方法が異なります。
- 回復できない問題を検出した場合は、例外をスローして「世界を止める」ことしかできません (以下を参照)。
- 回復可能なエラーが発生した場合は、独自のコード内で「ループ」する必要があります (たとえば、問題が解決され、現在のメッセージを正常に処理できるようになるまで) (タイムアウト、つまり例外
Processor#process()
、KeyValueMapper#apply()
この戦略を使用する -- コンシューマー構成heartbeat.interval.ms
と 0.10.1 session.timeout.ms
[KIP-62]を参照)
- 別の方法は、現在処理できないレコードを に入れ、
StateStore
後で処理することです。ただし、正しく理解するのは難しく、いくつかの Streams の仮定 (処理順序など) も破ります。使用はお勧めしません。使用する場合は、その影響について十分に注意する必要があります。
キャッチされていない例外がある場合は、終了し、StreamThread
コミットは発生しません (例外ハンドラーを登録して、これについて通知を受け取ることができます: http://docs.confluent.io/current/streams/developer-guide.html#using-kafka- stream-within-your-application-code . すべてがStreamThread
停止した場合は、アプリケーションを再起動するために の新しいインスタンスを作成する必要がありKafkaStreams
ます。
メッセージが正常に処理される前にユーザー コードから戻ってはなりません。これは、戻った場合、Streams はメッセージが正常に処理されたと想定する (したがって、対応するオフセットをコミットする可能性がある) ためです。箇条書き (3) に関しては、後で処理するためにレコードを特別な StateStore に入れることは、「正常に」処理されたレコードと見なされます。