spring-cloud-stream 1.0.0.RELEASE のドキュメントを少し調べましたが、エラー処理に関するドキュメントが見つからないようです。
kafka 0.9 での観察によると、消費者が RuntimeException をスローすると、3 回の再試行が見られます。3 回再試行した後、ログに次のように表示されます。
2016-05-17 09:35:59.216 ERROR 8983 --- [ kafka-binder-] o.s.i.k.listener.LoggingErrorHandler : Error while processing: KafkaMessage [Message(magic = 0, attributes = 0, crc = 3731457175, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=130 cap=130]), KafkaMessageMetadata [offset=2, nextOffset=3, Partition[topic='reservation', id=1]]
org.springframework.messaging.MessagingException: Exception thrown while invoking demo.sink.ReservationConsumer#handleReservation[1 args]; nested exception is java.lang.RuntimeException: no message
この時点で、コンシューマー オフセットは 1 遅れます。コンシューマーを再起動すると、メッセージは 3 回再試行されます。ただし、コンシューマーが例外をスローしないように同じパーティションに別のメッセージを送信すると、コンシューマー オフセットが更新され、例外をスローした元のメッセージは再起動後に再試行されなくなります。
これは、私が見つけられなかったどこかに文書化されていますか? エラー処理はバインダー固有ですか、それともバインダー間で一貫性を保つために scs を抽象化していますか? これは、消費者オフセットが kafka バインダーで更新される方法の予期しない結果であると思われます。enableDlq kafka コンシューマー プロパティが追加されたことがわかりました。それをテストしようとしていますが、kafka でデッド レターを処理する方法がわかりません。私はrabbitmqのデッドレターキューに精通していますが、rabbitmqではrabbitmq shovelプラグインを使用してdlqメッセージを再発行して再試行し、一時的なサービス停止が原因で失敗した場合をカバーできます. 同様のユーティリティを自分で作成する以外に、kafka で使用できる同様の機能については知りません。
更新: enableDlq kafka コンシューマー プロパティを有効にしてテストすると、エラー処理に関する同じコンシューマー オフセットの問題が示されます。コンシューマーが RuntimeException をスローすると、再試行が 3 回表示された後、エラー メッセージがログに記録されずerror.<destination>.<group>
、文書化されているように発行されたメッセージが表示されますが、コンシューマー オフセットは更新されず、1 遅れます。コンシューマーを再起動すると、元のトピック パーティションから同じ失敗したメッセージの処理を再試行し、3 回再試行して、同じメッセージをerror.<destination>.<group>
トピックに再度配置します (重複した dlq メッセージ)。コンシューマーが RuntimeException をスローしない同じトピック パーティションに別のメッセージを発行すると、オフセットが更新され、元の失敗したメッセージは再起動時に再試行されなくなります。
enableDlqがtrueかどうかに関係なく、消費者がエラーをスローしたときに、消費者はkafkaで消費者オフセットを更新する必要があると思います。これにより、すべての再試行に失敗したメッセージが破棄される (enableDlq が false の場合) か、dlq に発行されて再試行されない (enableDlq が true の場合) という一貫性が少なくとも保たれます。