2

実装したいシナリオは、 Kafka からのメッセージを消費して処理することです。何らかの条件が失敗した場合、メッセージを確認したくありません。これについては、Spring Cloud Stream リファレンス ドキュメントで見つけました。

autoCommitOffset メッセージが処理されたときにオフセットを自動コミットするかどうか。false に設定すると、確認応答ヘッダーがメッセージ ヘッダーで使用可能になり、遅延確認応答が行われます。

デフォルト: 真。

私の質問は、autoCommitOffset を false に設定した後、メッセージを確認するにはどうすればよいですか? コード例は非常に高く評価されます。

4

1 に答える 1

3

ここで質問への回答を提供しました https://github.com/spring-cloud/spring-cloud-stream/issues/575

本質的にそれは設定に帰着します spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false

次に、確認ヘッダーを処理します。

@SpringBootApplication
@EnableBinding(Sink.class)
   public class ManuallyAcknowdledgingConsumer {

      public static void main(String[] args) {
         SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
      }

      @StreamListener(Sink.INPUT)
      public void process(Message<?> message) {
         System.out.println(message.getPayload());
         Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
        if (acknowledgment != null) {
           System.out.println("Acknowledgment provided");
           acknowledgment.acknowledge();
        }
    }
}
于 2016-06-15T18:47:23.217 に答える