Kafka サイトの ConsumerGroupExample コードを使用して、Kafka High Level Consumer をテストしています。Kafka サーバー構成にある「test」というトピックに関する既存のメッセージをすべて取得したいと思います。他のブログを見ると、auto.offset.reset を「最小」に設定して、すべてのメッセージを取得できるようにする必要があります。
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("auto.offset.reset", "smallest");
props.put("zookeeper.session.timeout.ms", "10000");
return new ConsumerConfig(props);
}
私が実際に持っている質問は次のとおりです。高レベルのコンシューマーに対する同等の Java API 呼び出しは何ですか。
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning