同じデータがそれぞれのキューに向かう途中で異なる変換を経た後、Spring Integration を使用して、1 つのチャネルから 2 つの異なる Kafka キューにデータを送信しようとしています。問題は、プロデューサー コンテキストが重複しているようで、その理由がわかりません。
ここに私のフロー構成があります:
flow -> flow
.channel(“firstChannel")
.publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
.subscribe(f -> f
.transform(firstTransformer::transform)
.channel(MessageChannels.queue(50))
.handle(Kafka.outboundChannelAdapter(kafkaConfig)
.addProducer(firstMetadata(), brokerAddress), e -> e.id(“firstKafkaOutboundChannelAdapter")
.autoStartup(true)
.poller(p -> p.fixedDelay(1000, TimeUnit.MILLISECONDS).receiveTimeout(0).taskExecutor(taskExecutor))
.get())
)
.subscribe(f -> f
.transform(secondTransformer::transform)
.channel(MessageChannels.queue(50))
.handle(Kafka.outboundChannelAdapter(kafkaConfig)
.addProducer(secondMetadata(), brokerAddress), e -> e.id(“secondKafkaOutboundChannelAdapter")
.autoStartup(true)
.poller(p -> p.fixedDelay(1000, TimeUnit.MILLISECONDS).receiveTimeout(0).taskExecutor(taskExecutor))
.get())
));
例外は次のとおりです。
オブジェクト [org.springframework.integration.kafka.support.KafkaProducerContext@3163987e] を Bean 名 'not_specified' で登録できませんでした: 既にオブジェクト [org.springframework.integration.kafka.support.KafkaProducerContext@15f193b8] がバインドされています
kafkaConfig
さまざまなオブジェクトを使用してみましたが、役に立ちませんでした。一方、ProducerMetadata
最初のパラメーターが から まで異なることからわかるように、インスタンスは異なりますaddProducer
。これらは、他のメタデータの中でそれぞれの宛先キューの名前を提供します。
互いに競合するいくつかの暗黙的な Bean 定義が作成されているようです。
この例外を 2 つの s で解決するにはどうすればよいKafkaProducerContext
ですか?