問題タブ [reactor-kafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
java - 数回の処理試行が失敗した後、reactor-kafka でオフセットを Kafka にコミットします
メッセージが到着する Kafka トピックがあります。メッセージを読んで処理し、次のメッセージに進む必要があります。メッセージ処理は失敗する可能性があり、失敗した場合、次のメッセージに進む前に処理を数回 (たとえば 10 回) 再試行する必要があります。処理が 10 回失敗した場合、メッセージを破棄する必要があり、次のメッセージに進む必要があります。
を使用しますreactor-kafka
。すべての処理はリアクティブである必要があります。
これを解決しようとした方法は次のとおりです。
(ここreceiver
に がありますKafkaReceiver<String, String>
)。
これは例外のない場合に機能し、例外がある場合processRecord()
は 10 回リトライします。ここでの問題は、許可された 10 回の試行後にまだ失敗した場合、オフセットは (もちろん) コミットされないため、次回は同じオフセットが Kafka から読み取られるため、事実上、処理は「障害のある」オフセットで永久にスタックすることです。 .
私は次の明白なアイデアを実装しようとしました: 例外がretryBackoff()
演算子よりも「さらに通過」した場合、現在のオフセットをコミットします。オフセットをコミットするには が必要なので、現在のレコードと一緒にReceiverRecord
例外のラッピングを追加します。ExceptionWithRecord
と
extractRecordAndMaybeCommit()
指定された例外から を抽出してReceiverRecord
コミットします。
レコードを渡し、再試行が使い果たされた場合に後でコミットするこの方法は機能し、.commit()
メソッドが呼び出されます。しかし、それは効果がありません。
上記のリアクティブ パイプラインに例外が入ると、DefaultKafkaReceiver.dispose()
が呼び出されるため、後続のコミットの試行は無視されることがわかります。したがって、パブリッシャーによって例外が「認識」されるとすぐにオフセットをコミットすることは不可能であることがわかります。
を使用している間に「N 個のエラー後にコミット」動作を実装するにはどうすればよいreactor-kafka
ですか?