1

同じデータがそれぞれのキューに向かう途中で異なる変換を経た後、Spring Integration を使用して、1 つのチャネルから 2 つの異なる Kafka キューにデータを送信しようとしています。問題は、プロデューサー コンテキストが重複しているようで、その理由がわかりません。

ここに私のフロー構成があります:

flow -> flow
        .channel(“firstChannel")
        .publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
                .subscribe(f -> f
                                .transform(firstTransformer::transform)
                                .channel(MessageChannels.queue(50))
                                .handle(Kafka.outboundChannelAdapter(kafkaConfig)
                                        .addProducer(firstMetadata(), brokerAddress), e -> e.id(“firstKafkaOutboundChannelAdapter")
                                        .autoStartup(true)
                                        .poller(p -> p.fixedDelay(1000, TimeUnit.MILLISECONDS).receiveTimeout(0).taskExecutor(taskExecutor))
                                        .get())
                )
                .subscribe(f -> f
                                .transform(secondTransformer::transform)
                                .channel(MessageChannels.queue(50))
                                .handle(Kafka.outboundChannelAdapter(kafkaConfig)
                                        .addProducer(secondMetadata(), brokerAddress), e -> e.id(“secondKafkaOutboundChannelAdapter")
                                        .autoStartup(true)
                                        .poller(p -> p.fixedDelay(1000, TimeUnit.MILLISECONDS).receiveTimeout(0).taskExecutor(taskExecutor))
                                        .get())
                ));

例外は次のとおりです。

オブジェクト [org.springframework.integration.kafka.support.KafkaProducerContext@3163987e] を Bean 名 'not_specified' で登録できませんでした: 既にオブジェクト [org.springframework.integration.kafka.support.KafkaProducerContext@15f193b8] がバインドされています

kafkaConfigさまざまなオブジェクトを使用してみましたが、役に立ちませんでした。一方、ProducerMetadata最初のパラメーターが から まで異なることからわかるように、インスタンスは異なりますaddProducer。これらは、他のメタデータの中でそれぞれの宛先キューの名前を提供します。

互いに競合するいくつかの暗黙的な Bean 定義が作成されているようです。

この例外を 2 つの s で解決するにはどうすればよいKafkaProducerContextですか?

4

1 に答える 1

1

.get()それらで使用するのではなくKafkaProducerMessageHandlerSpec、Framework に環境を解決させてください。

問題は、次のことKafkaProducerMessageHandlerSpec implements ComponentsRegistrationを気にする団体がないためです。

public Collection<Object> getComponentsToRegister() {
    this.kafkaProducerContext.setProducerConfigurations(this.producerConfigurations);
    return Collections.<Object>singleton(this.kafkaProducerContext);
}

手動.get()呼び出し後。

私は同意します、これは多少の不便であり、最終アプリケーションのためのより良い解決策を見つける必要がありますがSpecKafka.outboundChannelAdapter().

私がはっきりしていることを願っています。

アップデート

わかりました、それは間違いなく私たちの問題です。そして、すぐに修正します: https://jira.spring.io/browse/INTEXT-216 https://jira.spring.io/browse/INTEXT-217

一方、回避策は次のとおりです。

 KafkaProducerContext kafkaProducerContext = (KafkaProducerContext) kafkaProducerMessageHandlerSpec.getComponentsToRegister().iterator().next();
 kafkaProducerContext.setBeanName(null);

どこに移動するべきか

Kafka.outboundChannelAdapter(kafkaConfig)
                                    .addProducer(firstMetadata(), brokerAddress)

privateそれにアクセスするための別のメソッドにkafkaProducerContext

于 2016-01-05T22:32:50.787 に答える