7

Kafka サイトの ConsumerGroupExample コードを使用して、Kafka High Level Consumer をテストしています。Kafka サーバー構成にある「test」というトピックに関する既存のメッセージをすべて取得したいと思います。他のブログを見ると、auto.offset.reset を「最小」に設定して、すべてのメッセージを取得できるようにする必要があります。

private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId)    {
    Properties props = new Properties();
    props.put("zookeeper.connect", a_zookeeper);
    props.put("group.id", a_groupId);
    props.put("auto.offset.reset", "smallest");
    props.put("zookeeper.session.timeout.ms", "10000");     

    return new ConsumerConfig(props);
}

私が実際に持っている質問は次のとおりです。高レベルのコンシューマーに対する同等の Java API 呼び出しは何ですか。

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

4

4 に答える 4

7

基本的に、新しいコンシューマーがトピックを消費しようとするたびに、最初からメッセージを読み取ります。特に、テスト目的で毎回最初から消費するだけの場合は、コンシューマーを新しい groupID で初期化するたびに、最初からメッセージが読み取られます。これが私がやった方法です:

properties.put("group.id", UUID.randomUUID().toString());

毎回最初からメッセージを読んでください!

于 2015-11-19T07:28:43.747 に答える
2

最初からメッセージを取得するには、次のようにします。

import kafka.utils.ZkUtils;
ZkUtils.maybeDeletePath("zkhost:zkport", "/consumers/group.id");

あとはルーティンワークをこなすだけ…

于 2015-12-09T04:14:21.723 に答える