1 つは同期アプローチで、もう 1 つは非同期方法で維持される 2 つのトピックがあるという要件があります。非同期はコンシューマー レコードの呼び出しで期待どおりに機能しますが、同期アプローチではコンシューマー コードが呼び出されません。
以下は、構成ファイルで宣言されたコードです
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
ここで autoFlush true を有効にしました
@Bean( name="KafkaPayloadSyncTemplate")
public KafkaTemplate<String, KafkaPayload> KafkaPayloadSyncTemplate() {
return new KafkaTemplate<String,KafkaPayload>(producerFactory(),true);
}
コントロールは、recordMetadataResults オブジェクトを返した後、消費者への呼び出しを行わずに停止します。
private List<RecordMetadata> sendPayloadToKafkaTopicInSync() throws InterruptedException, ExecutionException {
final List<RecordMetadata> recordMetadataResults = new ArrayList<RecordMetadata>();
KafkaPayload kafkaPayload = constructKafkaPayload();
ListenableFuture<SendResult<String,KafkaPayload>>
future = KafkaPayloadSyncTemplate.send(TestTopic, kafkaPayload);
SendResult<String, KafkaPayload> results;
results = future.get();
recordMetadataResults.add(results.getRecordMetadata());
return recordMetadataResults;
}
消費者コード
public class KafkaTestListener {
@Autowired
TestServiceImpl TestServiceImpl;
public final CountDownLatch countDownLatch = new CountDownLatch(1);
@KafkaListener(id="POC", topics = "TestTopic", group = "TestGroup")
public void listen(ConsumerRecord<String,KafkaPayload> record, Acknowledgment acknowledgment) {
countDownLatch.countDown();
TestServiceImpl.consumeKafkaMessage(record);
System.out.println("Acknowledgment : " + acknowledgment);
acknowledgment.acknowledge();
}
}
問題に基づいて、2 つの質問があります
- Sync Producer の場合、Listener クラス内で listen() を手動で呼び出す必要があります。はいの場合、その方法は?
- listener(
@KafkaListener
) が自動的に呼び出される場合、これを機能させるために他にどのようなセットアップ/構成を追加する必要がありますか。
事前に入力していただきありがとうございます
-スリカント