私は遊んでいてKafkaStreams
、KafkaConnect
単にトピックからのメッセージを消費しようとしています。このトピック用に「標準」のバッチ コンシューマーをセットアップしましたが、これは魅力的に機能します。最初にいくつかのレコードを Kafka に送信し、後でそれらを使用します。今、私はKakfaストリームを使用して同じことをしたいのですが、トピックから単一のメッセージを受け取りません. これが私が使用している消費者コードです。
final int NUMBER_OF_PARTITIONS = 4;
final Properties consumerConfig = new Properties();
consumerConfig.setProperty("zookeeper.connect", RULE.getConfiguration().kafka.getZookeeperUrl());
consumerConfig.setProperty("backoff.increment.ms", "100");
consumerConfig.setProperty("group.id", "java-consumer-example");
consumerConfig.setProperty("consumer.timeout.ms", "1000000");
consumerConfig.setProperty("client.id", "someclient");
consumerConfig.setProperty("auto.offset.reset", "smallest");
consumerConfig.setProperty("enable.auto.commit", "false");
consumerConfig.setProperty("bootstrap.servers", RULE.getConfiguration().kafka.getHosts());
final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerConfig));
final TopicFilter sourceTopicFilter = new Whitelist(RULE.getConfiguration().kafka.getTopic());
final VerifiableProperties decoderProps = new VerifiableProperties();
decoderProps.props().setProperty("schema.registry.url", RULE.getConfiguration().kafka.getRegistry());
decoderProps.props().setProperty("max.schemas.per.subject", "1");
final List<KafkaStream<String, Object>> streams = connector
.createMessageStreamsByFilter(sourceTopicFilter, NUMBER_OF_PARTITIONS, new StringDecoder(decoderProps), new KafkaAvroDecoder(decoderProps));
final ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_PARTITIONS);
for (final KafkaStream stream : streams) {
executorService.submit(() -> {
try {
final ConsumerIterator it = stream.iterator();
while (it.hasNext()) {
final MessageAndMetadata messageAndMetadata = it.next();
final String key = (String) messageAndMetadata.key();
System.out.println("KEY" + key);
}
} catch (final Exception ex) {
LOGGER.error("ERROR", ex);
}
});
}
私の問題は、私のコードがit.hasNext()
タイムアウトに達するまで待機し続けることです。ここで詳細が欠けている可能性がありますが、なぜこのトピックから何も得られないのかわかりません。このテストの一環として、コンシューマが開始する直前にこのトピックに多数のレコードを送信するプロデューサーがあるため、オフセットの問題になることはありません。どんなアイデアでも大歓迎です。