0

2 つの IntegrationFlowsが あり、どちらも Apache Kafka からメッセージを受信します

最初の IntegrationFlow - 入力チャネルで、Consumer1(concurrency=4)が topic_1 を読み取ります

2 番目の IntegrationFlow - 入力チャネルで、Consumer2(concurrency=4)が topic_2 を読み取ります

しかし、これら2 つの IntegrationFlowsは、 1 つの共通クラスMyMessageHandlerが指定されている出力チャネルにメッセージを送信します。

このような:

@Bean
public IntegrationFlow sendFromQueueFlow1(MyMessageHandler message) {
    return IntegrationFlows
            .from(Kafka
                    .messageDrivenChannelAdapter(consumerFactory1, "topic_1")
                    .configureListenerContainer(configureListenerContainer_priority1)
                    )
            .handle(message)
            .get();
}


@Bean
public IntegrationFlow sendFromQueueFlow2(MyMessageHandler message) {
    return IntegrationFlows
            .from(Kafka
                    .messageDrivenChannelAdapter(consumerFactory2, "topic_2")
                    .configureListenerContainer(configureListenerContainer_priority2)
                    )
            .handle(message)
            .get();
}

クラスMyMessageHandlerにはメソッドsend(message)があり、このメソッドはメッセージをさらに別のサービスに渡します

class MyMessageHandler {
            
    protected void handleMessageInternal(Message<?> message)
    {
        String postResponse = myService.send(message); // remote service calling
        msgsStatisticsService.sendMessage(message, postResponse);
        // *******
    }
}

各 IntegrationFlow 内で、4 つのコンシューマー スレッドが動作し(合計 8​​ つのスレッド)、それらはすべて 1 つのクラス MyMessageHandler に移動し、1 つのメソッドsend() になります。

どのような問題が考えられますか? 2 つの IntegrationFlow は、メッセージを 1 つの共通クラスに渡すときにお互いを認識しますか? MyMessageHandler クラスでスレッド セーフを提供する必要がありますか? send () メソッドの先頭に「synchronized 」という単語を追加する必要がありますか ???

しかし、3 つ目の IntegrationFlow を作成するとどうなるでしょうか?

1 つの IntegrationFlow だけがそれ自体を介して MyMessageHandler クラスにメッセージを渡すことができるようにするには? それはスレッドセーフでしょうか?例:

@Bean
public IntegrationFlow sendFromQueueFlow1() {
return IntegrationFlows
        .from(Kafka
                .messageDrivenChannelAdapter(consumerFactory1, "topic_1")
                .configureListenerContainer(configureListenerContainer_priority1)
        )
        .channel(**SOME_CHANNEL**())
        .get();

}

@Bean
public IntegrationFlow sendFromQueueFlow2() {
return IntegrationFlows
        .from(Kafka
                .messageDrivenChannelAdapter(consumerFactory2, "topic_2")
                .configureListenerContainer(configureListenerContainer_priority2)
        )
        .channel(**SOME_CHANNEL**())
        .get();

}

@Bean
public MessageChannel **SOME_CHANNEL**() {

    DirectChannel channel = new DirectChannel();
    return channel;
 }

@Bean
public IntegrationFlow sendALLFromQueueFlow(MyMessageHandler message) {

return IntegrationFlows
        .from(**SOME_CHANNEL**())
        .handle(message)
        .get();
}
4

1 に答える 1