2

次の例を適応させようとしています: https://github.com/joshlong/spring-and-kafka

次のライブラリの最新の安定したバージョン:

org.apache.kafka > kafka_2.10 > 0.8.2.2
org.springframework.integration > spring-integration-kafka > 1.2.1.RELEASE
org.springframework.integration > spring-integration-java-dsl > 1.1.0.RELEASE

統合 DSL ライブラリは、おそらく新しい KafkaProducer の導入によって引き起こされたリファクタリングを経たようです。

私のプロデューサー設定のコードは次のとおりです。

@Bean(name = OUTBOUND_ID)
IntegrationFlow producer() {
    log.info("starting producer flow..");

    return flowDefinition -> {
        ProducerMetadata<String, String> getProducerMetadata = new ProducerMetadata<>(this.kafkaConfig.getTopic(),
                    String.class, String.class, new StringSerializer(), new StringSerializer());


        KafkaProducerMessageHandler kafkaProducerMessageHandler = Kafka.outboundChannelAdapter(props ->
                props.put("timeout.ms", "35000"))
                .messageKey(m -> m.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
                .addProducer(getProducerMetadata, this.kafkaConfig.getBrokerAddress())
                .get();

        flowDefinition
                .handle(kafkaProducerMessageHandler);
    };
}

メッセージ生成のコード:

@Bean
@DependsOn(OUTBOUND_ID)
CommandLineRunner kickOff(@Qualifier(OUTBOUND_ID + ".input") MessageChannel in) {
    return args -> {              
        for (int i = 0; i < 1000; i++) {
            in.send(MessageBuilder.withPayload("#" + i).setHeader(KafkaHeaders.TOPIC, this.kafkaConfig.getTopic()).build());
                log.info("sending message #" + i);
        }
    };
}

それは私が得る例外です:

Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0]; nested exception is java.lang.NullPointerException
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:287)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:245)
at jc.DemoApplication$ProducerConfiguration.lambda$kickOff$0(DemoApplication.java:104)
at org.springframework.boot.SpringApplication.runCommandLineRunners(SpringApplication.java:673)
... 10 more
Caused by: java.lang.NullPointerException
at org.springframework.integration.kafka.support.KafkaProducerContext.getTopicConfiguration(KafkaProducerContext.java:67)
at org.springframework.integration.kafka.support.KafkaProducerContext.send(KafkaProducerContext.java:201)
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleMessageInternal(KafkaProducerMessageHandler.java:88)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
... 18 more

更新:
完全な作業ソースは私のフォークにあります:
https://github.com/magiccrafter/spring-and-kafka

4

1 に答える 1

1

遅くなってすみません。

あなたの問題は、初期のIntegrationComponentSpecインスタンス化に関するものです:

KafkaProducerMessageHandler kafkaProducerMessageHandler = Kafka.outboundChannelAdapter(props ->
          props.put("timeout.ms", "35000"))
          .messageKey(m -> m.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
          .addProducer(getProducerMetadata, this.kafkaConfig.getBrokerAddress())
          .get();

自称してはいけません.get()。これを正しく解決できるのKafkaProducerMessageHandlerSpecComponentsRegistration、SI Java DSL だけです。そこにあるコードは次のようになります。

public Collection<Object> getComponentsToRegister() {
    this.kafkaProducerContext.setProducerConfigurations(this.producerConfigurations);
    return Collections.<Object>singleton(this.kafkaProducerContext);
}

このコードは呼び出されないため、this.producerConfigurationsは に取り込まれませんthis.kafkaProducerContext。いずれにせよ、最後のものは Bean として登録する必要があります。

したがって、問題を解決するには、IntegrationComponentSpecin DSL 定義のみを処理する必要があります。

を取得して、以下KafkaProducerMessageHandlerSpecに使用して.handle()ください。Kafka.outboundChannelAdapter()から直接使用できる場合、このオブジェクトを抽出する理由があるかどうかはわかりません.handle()

于 2015-10-20T22:15:40.677 に答える