0

RabbitMQ で Spring Cloud Reactive Streams を使用しています。

Spring Reactive Stream は、メッセージをキューから取り出すとすぐにメッセージを確認しているように見えます。そのため、メッセージ処理中に発生する未処理の例外のエラーは、アプリケーションで処理する必要があります (これは、未処理の例外がスローされる可能性があり、メッセージが拒否される可能性がある非反応性ストリームとは異なるため、デッド レター キューに送信されます)。 )。

メッセージの送信中にアプリケーションが突然シャットダウンした場合、どのように対処すればよいでしょうか?

例えば:

  • アプリケーションがメッセージをキューから取り出します
  • アプリケーションはメッセージを確認済みとしてマークします
  • アプリケーションがメッセージ処理を開始します
  • メッセージ処理が完了する前にアプリケーションがシャットダウンされた

これが発生すると、メッセージは完全に失われたように見えます。これは、メッセージがキューから外れているにもかかわらず、アプリケーションが停止しているためです。これらのメッセージを回復するにはどうすればよいですか?

4

1 に答える 1

1

手動の承認を使用し、処理が非同期的に完了するまで承認を延期する必要があります。そのためには、メッセージ全体を消費する必要があります。

    @Bean
    public Consumer<Flux<Message<String>>> async() {
        return inbound -> inbound

                ...

                .map(msg -> {
                    try {
                        msg.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class)
                                .basicAck(msg.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class), false);
                    }
                    catch (IOException e) {
                        e.printStackTrace();
                    }
                    return msg.getPayload();
                })
                .subscribe(System.out::println);
    }
spring:
  cloud:
    stream:
      function.definition: async
      bindings:
        async-in-0:
          destination: testtock
          group: async
      rabbit:
        bindings:
          async-in-0:
            consumer:
              acknowledge-mode: MANUAL
              prefetch: 10

basicRejectDLQ に再キューイングまたは送信するために使用します。

于 2020-04-09T22:26:30.147 に答える