2 つの IntegrationFlowsが あり、どちらも Apache Kafka からメッセージを受信します
最初の IntegrationFlow - 入力チャネルで、Consumer1(concurrency=4)が topic_1 を読み取ります
2 番目の IntegrationFlow - 入力チャネルで、Consumer2(concurrency=4)が topic_2 を読み取ります
しかし、これら2 つの IntegrationFlowsは、 1 つの共通クラスMyMessageHandlerが指定されている出力チャネルにメッセージを送信します。
このような:
@Bean
public IntegrationFlow sendFromQueueFlow1(MyMessageHandler message) {
return IntegrationFlows
.from(Kafka
.messageDrivenChannelAdapter(consumerFactory1, "topic_1")
.configureListenerContainer(configureListenerContainer_priority1)
)
.handle(message)
.get();
}
@Bean
public IntegrationFlow sendFromQueueFlow2(MyMessageHandler message) {
return IntegrationFlows
.from(Kafka
.messageDrivenChannelAdapter(consumerFactory2, "topic_2")
.configureListenerContainer(configureListenerContainer_priority2)
)
.handle(message)
.get();
}
クラスMyMessageHandlerにはメソッドsend(message)があり、このメソッドはメッセージをさらに別のサービスに渡します
class MyMessageHandler {
protected void handleMessageInternal(Message<?> message)
{
String postResponse = myService.send(message); // remote service calling
msgsStatisticsService.sendMessage(message, postResponse);
// *******
}
}
各 IntegrationFlow 内で、4 つのコンシューマー スレッドが動作し(合計 8 つのスレッド)、それらはすべて 1 つのクラス MyMessageHandler に移動し、1 つのメソッドsend() になります。
どのような問題が考えられますか? 2 つの IntegrationFlow は、メッセージを 1 つの共通クラスに渡すときにお互いを認識しますか? MyMessageHandler クラスでスレッド セーフを提供する必要がありますか? send () メソッドの先頭に「synchronized 」という単語を追加する必要がありますか ???
しかし、3 つ目の IntegrationFlow を作成するとどうなるでしょうか?
1 つの IntegrationFlow だけがそれ自体を介して MyMessageHandler クラスにメッセージを渡すことができるようにするには? それはスレッドセーフでしょうか?例:
@Bean
public IntegrationFlow sendFromQueueFlow1() {
return IntegrationFlows
.from(Kafka
.messageDrivenChannelAdapter(consumerFactory1, "topic_1")
.configureListenerContainer(configureListenerContainer_priority1)
)
.channel(**SOME_CHANNEL**())
.get();
}
@Bean
public IntegrationFlow sendFromQueueFlow2() {
return IntegrationFlows
.from(Kafka
.messageDrivenChannelAdapter(consumerFactory2, "topic_2")
.configureListenerContainer(configureListenerContainer_priority2)
)
.channel(**SOME_CHANNEL**())
.get();
}
@Bean
public MessageChannel **SOME_CHANNEL**() {
DirectChannel channel = new DirectChannel();
return channel;
}
@Bean
public IntegrationFlow sendALLFromQueueFlow(MyMessageHandler message) {
return IntegrationFlows
.from(**SOME_CHANNEL**())
.handle(message)
.get();
}