1

XD ストリームでは、メッセージはソース モジュールを介して Kafka トピックから消費され、シンク Kafka モジュールに送信されます。カスタム ソースおよびシンク Kafka モジュールを開発する理由は、正常に送信されたメッセージで、ダウンストリームのシンク モジュールから確認を取得した場合にのみ、ソース モジュールからのオフセットを更新したいからです。

Kafka 0.10.0.0 環境のトピックで Spring Integration Kafka 2.0.1.RELEASE および Spring Kafka 1.0.3.RELEASE を使用しています。私は次のことを試しました:

ソース モジュールの構成:

@Configuration
public class ModuleConfiguration {

    @Value("${topic}")
    private String topic;

    @Value("${brokerList}")
    private String brokerAddress;

    @Bean
    public SubscribableChannel output() {
        DirectChannel output = new DirectChannel();
        return output;
    }

    @Autowired
    TopicPartitionInitialOffset topicPartition;

    @Bean
    public TopicPartitionInitialOffset topicPartition(){
        return new TopicPartitionInitialOffset(this.topic, 0, (long) 0);    
    }

    @Bean
    public KafkaMessageListenerContainer<String, String> container() throws Exception {
        ContainerProperties containerProps = new ContainerProperties(topicPartition);
        containerProps.setAckMode(AckMode.MANUAL);
        KafkaMessageListenerContainer<String, String> kafkaMessageListenerContainer = new KafkaMessageListenerContainer<>(consumerFactory(),containerProps);
        return kafkaMessageListenerContainer;
    }
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        DefaultKafkaConsumerFactory<String,String> consumerFactory =  new DefaultKafkaConsumerFactory<>(props);
        return consumerFactory;
    }
}

ソース モジュール: InboundKafkaMessageDrivenAdapter

@MessageEndpoint
@Import(ModuleConfiguration.class)
public class InboundKafkaMessageDrivenAdapter {

    @Autowired
    KafkaMessageListenerContainer<String, String> container;

    @Autowired
    SubscribableChannel output;

    @Bean
    public KafkaMessageDrivenChannelAdapter<String, String> adapter(KafkaMessageListenerContainer<String, String> container) {
        KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter<>(container);
        kafkaMessageDrivenChannelAdapter.setOutputChannel(output);
        return kafkaMessageDrivenChannelAdapter;
    }
}

シンク モジュール: 構成

@Configuration
@EnableIntegration
public class ModuleConfiguration {

    @Value("${topic}")
    private String topic;

    @Value("${brokerList}")
    private String brokerAddress;

    @Bean
    public KafkaProducerMessageHandler<String,String> handler() throws Exception {
        KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<>(kafkaTemplate());
        handler.setTopicExpression(new LiteralExpression(this.topic));
        return handler;
    }

    @Bean
    public SubscribableChannel input() {
        return new DirectChannel();
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }
}

シンク モジュール: SinkActivator

@Import(ModuleConfiguration.class)
@MessageEndpoint
public class SinkActivator {

    @Autowired
    KafkaProducerMessageHandler<String,String> handler;

    @Autowired
    SubscribableChannel input;

    @ServiceActivator(inputChannel = "input")
    public void sendMessage(Message<?> msg) throws Exception{
            Acknowledgment acknowledgment = msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
            handler.handleMessage(msg);
            acknowledgment.acknowledge();
            }
}

ソースはメッセージを受信して​​シンクに送信することに成功していますが、シンクで確認応答を取得しようとすると:

承認 acknowledgment = msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);

次の例外がスローされます。

原因: java.lang.IllegalArgumentException: ヘッダー 'kafka_acknowledgment' に指定された型が正しくありません。[interface org.springframework.kafka.support.Acknowledgment] が期待されますが、実際の型は [class org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ConsumerAcknowledgment] です

spring-integration-kafka-2.0.1.RELEASE のソース コードでは、AckMode=MANUAL の場合にクラス KafkaMessageListenerContainer がメッセージに kafka_acknowledgment ヘッダーが追加されますが、型は ConsumerAcknowledgement の内部静的クラスです。

では、ソースから送信されたメッセージでシンク モジュールから確認応答を取得するにはどうすればよいでしょうか。

4

1 に答える 1

0

ローカル トランスポートを使用していない限り、それはできません。これAcknowledgmentは「ライブ」オブジェクトであり、ワイヤを介して別のモジュールに送信することはできません。

ローカル トランスポートを使用している場合は機能しますが、各モジュールが独自のクラス ローダーで実行され、Acknowledgmentインターフェイスがクラスの異なるインスタンスであるため、クラス ローダーの問題が発生します。

クラスが共通のクラスローダーからロードされるように、spring-integration-kafka と spring-kafka を xd/lib フォルダーに移動する必要があります。

于 2016-08-19T16:11:13.093 に答える