構成にいくつかの問題があります。
.rabbit
うさぎ固有のプロパティがありません)
- 使用するには、グループ名と永続的なサブスクリプションが必要です
autoBindDlq
autoBindDlq
出力側には適用されません
プロデューサの送信が同じトランザクションで実行されるように、コンシューマを処理する必要があります。
これを 1.0.2.RELEASE でテストしました。
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.rabbit.bindings.output.producer.transacted=true
spring.cloud.stream.bindings.input.destination=so8400in
spring.cloud.stream.bindings.input.group=so8400
spring.cloud.stream.rabbit.bindings.input.consumer.durableSubscription=true
spring.cloud.stream.rabbit.bindings.input.consumer.autoBindDlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.transacted=true
期待どおりに機能しました。
編集
実際、いいえ、公開されたメッセージはロールバックされませんでした。調査中...
EDIT2
わかった; 機能しますが、使用できません。これをrepublishToDlq
有効にすると、バインダーが失敗したメッセージを DLQ に発行し、トランザクションがコミットされるためです。
それが false の場合、コンテナーに例外がスローされ、トランザクションがロールバックされ、RabbitMQ が失敗したメッセージを DLQ に移動します。
ただし、再試行はデフォルトで有効になっている (3 回の試行) ため、プロセッサが再試行中に成功すると、出力に重複が生じることに注意してください。
これを希望どおりに機能させるには、最大試行回数を 1 に設定して再試行を無効にする必要があります (使用しないでくださいrepublishToDlq
)。
EDIT3
OK、エラーの公開をさらに制御したい場合は、この JIRAの修正が Spring AMQP に適用されたときに機能します...
@SpringBootApplication
@EnableBinding({ Processor.class, So39018400Application.Errors.class })
public class So39018400Application {
public static void main(String[] args) {
SpringApplication.run(So39018400Application.class, args);
}
@Bean
public Foo foo() {
return new Foo();
}
public interface Errors {
@Output("errors")
MessageChannel errorChannel();
}
private static class Foo {
@Autowired
Source source;
@Autowired
Errors errors;
@StreamListener(Processor.INPUT)
public void handle (Message<byte[]> in) {
try {
source.output().send(new GenericMessage<>("foo"));
source.output().send(new GenericMessage<>("foo"));
throw new RuntimeException("foo");
}
catch (RuntimeException e) {
errors.errorChannel().send(MessageBuilder.fromMessage(in)
.setHeader("foo", "bar") // add whatever you want, stack trace etc.
.build());
throw e;
}
}
}
}
プロパティ付き:
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.errors.destination=so8400errors
spring.cloud.stream.rabbit.bindings.errors.producer.transacted=false
spring.cloud.stream.rabbit.bindings.output.producer.transacted=true
spring.cloud.stream.bindings.input.destination=so8400in
spring.cloud.stream.bindings.input.group=so8400
spring.cloud.stream.rabbit.bindings.input.consumer.transacted=true
spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=false
spring.cloud.stream.bindings.input.consumer.max-attempts=1