2

Spring Cloud Streams と Dataflow を学び始めたばかりで、重要なユースケースの 1 つを知りたいです。メッセージを受け取り、それを 5 回再送信して出力するサンプル プロセッサ Multiplier を作成しました。

@EnableBinding(Processor.class)
public class MultiplierProcessor {
    @Autowired
    private Source source;

    private int repeats = 5;

    @Transactional
    @StreamListener(Processor.INPUT)
    public void handle(String payload) {
        for (int i = 0; i < repeats; i++) {
            if(i == 4) {
                throw new RuntimeException("EXCEPTION");
            }
            source.output().send(new GenericMessage<>(payload));
        }
    }
}

5 番目の送信前に、このプロセッサがクラッシュすることがわかります。なんで?それができるからです(プログラムは例外をスローします)。この場合、Spring Cloud Stream で障害防止を実践したかったのです。

私が達成したいのは、入力メッセージを DLQ に戻し、前に送信された 4 つのメッセージを元に戻し、次のオペランドで消費されないようにすることです (通常の JMS トランザクションと同様)。プロセッサ プロジェクトで次のプロパティを定義しようとしましたが、成功しませんでした。

spring.cloud.stream.bindings.output.producer.autoBindDlq=true
spring.cloud.stream.bindings.output.producer.republishToDlq=true
spring.cloud.stream.bindings.output.producer.transacted=true

spring.cloud.stream.bindings.input.consumer.autoBindDlq=true

可能かどうか、また何が間違っているのか教えていただけますか? いくつかの例を挙げていただければ幸いです。

4

1 に答える 1

4

構成にいくつかの問題があります。

  • .rabbitうさぎ固有のプロパティがありません)
  • 使用するには、グループ名と永続的なサブスクリプションが必要ですautoBindDlq
  • autoBindDlq出力側には適用されません

プロデューサの送信が同じトランザクションで実行されるように、コンシューマを処理する必要があります。

これを 1.0.2.RELEASE でテストしました。

spring.cloud.stream.bindings.output.destination=so8400out

spring.cloud.stream.rabbit.bindings.output.producer.transacted=true

spring.cloud.stream.bindings.input.destination=so8400in
spring.cloud.stream.bindings.input.group=so8400

spring.cloud.stream.rabbit.bindings.input.consumer.durableSubscription=true
spring.cloud.stream.rabbit.bindings.input.consumer.autoBindDlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.transacted=true

期待どおりに機能しました。

編集

実際、いいえ、公開されたメッセージはロールバックされませんでした。調査中...

EDIT2

わかった; 機能しますが、使用できません。これをrepublishToDlq有効にすると、バインダーが失敗したメッセージを DLQ に発行し、トランザクションがコミットされるためです。

それが false の場合、コンテナーに例外がスローされ、トランザクションがロールバックされ、RabbitMQ が失敗したメッセージを DLQ に移動します。

ただし、再試行はデフォルトで有効になっている (3 回の試行) ため、プロセッサが再試行中に成功すると、出力に重複が生じることに注意してください。

これを希望どおりに機能させるには、最大試行回数を 1 に設定して再試行を無効にする必要があります (使用しないでくださいrepublishToDlq)。

EDIT3

OK、エラーの公開をさらに制御したい場合は、この JIRAの修正が Spring AMQP に適用されたときに機能します...

@SpringBootApplication
@EnableBinding({ Processor.class, So39018400Application.Errors.class })
public class So39018400Application {

    public static void main(String[] args) {
        SpringApplication.run(So39018400Application.class, args);
    }

    @Bean
    public Foo foo() {
        return new Foo();
    }

    public interface Errors {

        @Output("errors")
        MessageChannel errorChannel();

    }

    private static class Foo {

        @Autowired
        Source source;

        @Autowired
        Errors errors;

        @StreamListener(Processor.INPUT)
        public void handle (Message<byte[]> in) {
            try {
                source.output().send(new GenericMessage<>("foo"));
                source.output().send(new GenericMessage<>("foo"));
                throw new RuntimeException("foo");
            }
            catch (RuntimeException e) {
                errors.errorChannel().send(MessageBuilder.fromMessage(in)
                        .setHeader("foo", "bar") // add whatever you want, stack trace etc.
                        .build());
                throw e;
            }
        }

    }

}

プロパティ付き:

spring.cloud.stream.bindings.output.destination=so8400out

spring.cloud.stream.bindings.errors.destination=so8400errors
spring.cloud.stream.rabbit.bindings.errors.producer.transacted=false


spring.cloud.stream.rabbit.bindings.output.producer.transacted=true

spring.cloud.stream.bindings.input.destination=so8400in
spring.cloud.stream.bindings.input.group=so8400

spring.cloud.stream.rabbit.bindings.input.consumer.transacted=true
spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=false
spring.cloud.stream.bindings.input.consumer.max-attempts=1
于 2016-08-18T14:01:07.567 に答える