7

現在、基本的に次の簡略化されたメカニズムでメッセージを確認しています。

  @KafkaListener(topics = "someTopic")
  public void listen(final String message, final Acknowledgment ack) {
    try {
        processMessage(message);
        ack.acknowledge();
    } catch (final IOException e) {
        // do not acknowledge here since we can temporarily not process the message
    } 

基本的に、メッセージを一時的に処理できない場合 (IOExceptions の場合) はいつでも、後でもう一度受信する必要があります。

しかし、確認応答は同じパーティション内の以前のすべてのメッセージが正常に処理されたと想定しているため、これは機能しません。また、IOException の場合、失敗したメッセージはスキップされますが、同じパーティションのより高いインデックスを持つ別のメッセージによって確認応答される可能性があります。

これを修正する方法はいくつかありますが、これは、KafkaListener メソッド内で直接確認応答を呼び出すことを回避するための厄介な回避策を意味します。私たちのユースケースは非常に具体的なものですか、それともSpring Kafkaユーザーが想定する「デフォルト」の動作に似ていませんか?

この種の問題に対する春のカフカの解決策はありますか? または、これを「正しく」解決するアイデアはありますか?

4

1 に答える 1

3

それがまさにカフカの仕組みです。次のメッセージに進む前に、再試行を有効にして IOException を通過しようとすることができます。

エラー ハンドルを構成して、失敗したメッセージを別のトピックに発行し、後で再生することができます。または、エラー ハンドラーはコンテナーを停止して、新しい配信を防ぐことができます。

コンテナが再起動されると、メッセージが再生されます。

于 2017-01-04T14:12:25.133 に答える