Spring Cloud Stream と Kafka を統合して Spring Boot アプリケーションを作成しようとしています。Kafka で 1 つのパーティションを持つサンプル トピックを作成し、ここに示す指示に基づいて作成された Spring Boot アプリケーションからトピックに公開しました。
http://docs.spring.io/spring-cloud-stream/docs/1.0.2.RELEASE/reference/htmlsingle/index.html
と
https://blog.codecentric.de/en/2016/04/event-driven-microservices-spring-cloud-stream/
Spring Boot アプリ -
@SpringBootApplication
public class MyApplication {
private static final Log logger = LogFactory.getLog(MyApplication.class);
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
}
Kafka プロデューサー クラス
@Service
@EnableBinding(Source.class)
public class MyProducer {
private static final Log logger = LogFactory.getLog(MyProducer.class);
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
public MessageSource<TimeInfo> timerMessageSource() {
TimeInfo t = new TimeInfo(new Timestamp(new Date().getTime())+"","Label");
MessageBuilder<TimeInfo> m = MessageBuilder.withPayload(t);
return () -> m.build();
}
public static class TimeInfo{
private String time;
private String label;
public TimeInfo(String time, String label) {
super();
this.time = time;
this.label = label;
}
public String getTime() {
return time;
}
public String getLabel() {
return label;
}
}
}
例外を処理したい場合を除いて、すべてがうまく機能しています。
Kafka トピックがダウンした場合、アプリのログ ファイルに ConnectionRefused 例外がスローされていることがわかりますが、組み込みの再試行ロジックは停止することなく継続的に再試行しているようです!
私が処理してさらに例外処理を行うためにスローされる例外はまったくありません。上記の Spring Cloud Stream ドキュメントで Kafka の Producer オプションと Binder オプションを読みましたが、この例外をスローしてキャプチャするためのカスタマイズ オプションが見つかりません。
私はSpring Boot / Spring Cloud Stream / Spring Integration(クラウドストリームプロジェクトの基盤となる実装のようです)を初めて使用しています。
この例外をSpring Cloud Streamアプリにカスケードするために知っていることは他にありますか?