2

Kafka Java API を使用してメッセージを消費しようとしています。kafka-console-consumer.bat を使用してメッセージを消費できます。ただし、Java API からのメッセージを消費することはできません。エラーやメッセージが表示されません。何が間違っているのか教えてください。

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class SimpleConsumer {
    private final ConsumerConnector consumer;
    private final String topic;

    public SimpleConsumer(String zookeeper, String groupId, String topic) {
        Properties props = new Properties();
        props.put("zookeeper.connect", zookeeper);
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "500");
        props.put("zookeeper.sync.time.ms", "250");
        props.put("auto.commit.interval.ms", "1000");

        consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
        this.topic = topic;
    }

    public void testConsumer() {
        Map<String, Integer> topicCount = new HashMap<String, Integer>();
        topicCount.put(topic, 1);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
        List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
        for (final KafkaStream stream : streams) {
            ConsumerIterator<byte[], byte[]> it = stream.iterator();
            while (it.hasNext()) {
                System.out.println("Message from Single Topic: " + new String(it.next().message()));
            }
        }
        if (consumer != null) {
            consumer.shutdown();
        }
    }

    public static void main(String[] args) {
        String topic = "test";
        SimpleConsumer simpleHLConsumer = new SimpleConsumer("localhost:2181", "testgroup", topic);
        simpleHLConsumer.testConsumer();
    }
}

コンソール コマンド

kafka-console-consumer.bat --zookeeper localhost:2181 --topic test --from-beginning

作成されたトピック:

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test

次を使用してメッセージを発行します。

kafka-console-producer.bat --broker-list localhost:9092 --topic test

コンシューマー プログラムを実行し、ブローカー コンソールでログを取得すると、次のようになります。

[2015-12-29 11:57:34,448] 情報 /IP (kafka.network.Processor) へのソケット接続を閉じています。

このログを取得するプログラムを閉じると:

java.io.IOException: 既存の接続がリモート ホストによって強制的に閉じられました

上記のプログラムからメッセージを消費できない理由を教えてください。

ただし、を使用してメッセージを消費できます

kafka-console-consumer.bat --zookeeper localhost:2181 --topic test --from-beginning 

助けて。

4

1 に答える 1

3

最初からメッセージを読みたい場合は、オプションを設定する必要があります

auto.offset.reset=smallest

デフォルトでは「最大」です。

http://kafka.apache.org/documentation.html

ZooKeeper に初期オフセットがない場合、またはオフセットが範囲外の場合の対処方法:

  • 最小 : オフセットを最小のオフセットに自動的にリセットします
  • maximum : オフセットを最大オフセットに自動的にリセットします
  • その他: 消費者に例外をスローします

注: 新しいコンシューマ API のこのオプション (0.9.0.0 以降):

auto.offset.reset=earliest|latest|none

group.id のオフセットが Zookeeper に保存されました。したがって、メッセージが group.id を変更したり、zookeeper をクリーンアップしたりしたい場合は、

于 2015-12-28T08:03:27.703 に答える