-1

1 つは同期アプローチで、もう 1 つは非同期方法で維持される 2 つのトピックがあるという要件があります。非同期はコンシューマー レコードの呼び出しで期待どおりに機能しますが、同期アプローチではコンシューマー コードが呼び出されません。

以下は、構成ファイルで宣言されたコードです

 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093");
 props.put(ProducerConfig.RETRIES_CONFIG, 3);
 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
 props.put(ProducerConfig.ACKS_CONFIG, "all");
 props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

ここで autoFlush true を有効にしました

 @Bean( name="KafkaPayloadSyncTemplate")
    public KafkaTemplate<String, KafkaPayload> KafkaPayloadSyncTemplate() {
        return new KafkaTemplate<String,KafkaPayload>(producerFactory(),true);
 }

コントロールは、recordMetadataResults オブジェクトを返した後、消費者への呼び出しを行わずに停止します。

  private List<RecordMetadata> sendPayloadToKafkaTopicInSync() throws   InterruptedException, ExecutionException {      
        final List<RecordMetadata> recordMetadataResults = new ArrayList<RecordMetadata>();
        KafkaPayload kafkaPayload = constructKafkaPayload();
        ListenableFuture<SendResult<String,KafkaPayload>> 
future = KafkaPayloadSyncTemplate.send(TestTopic, kafkaPayload);
        SendResult<String, KafkaPayload> results;
        results = future.get();
        recordMetadataResults.add(results.getRecordMetadata());     
        return recordMetadataResults;           
    }

消費者コード

public class KafkaTestListener {    
    @Autowired
    TestServiceImpl TestServiceImpl;    
    public final CountDownLatch countDownLatch = new CountDownLatch(1); 
    @KafkaListener(id="POC", topics = "TestTopic", group = "TestGroup")
    public void listen(ConsumerRecord<String,KafkaPayload> record, Acknowledgment acknowledgment) {
        countDownLatch.countDown();     
        TestServiceImpl.consumeKafkaMessage(record);        
        System.out.println("Acknowledgment : " + acknowledgment);
        acknowledgment.acknowledge();       
    }
}

問題に基づいて、2 つの質問があります

  1. Sync Producer の場合、Listener クラス内で listen() を手動で呼び出す必要があります。はいの場合、その方法は?
  2. listener( @KafkaListener) が自動的に呼び出される場合、これを機能させるために他にどのようなセットアップ/構成を追加する必要がありますか。

事前に入力していただきありがとうございます

-スリカント

4

1 に答える 1

1

consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");コンシューマ プロパティには必ず使用してください。

同期/非同期について何を意味するのかわかりませんが、生産と消費は完全に区別された操作です。また、プロデューサー側からコンシューマーに影響を与えることはできません。間に Kafka Broker があるからです。

于 2017-02-21T17:45:48.970 に答える