3

kafka-storm を統合しようとしていました。いくつかの例から始めました。

GitHub からサンプルを実行できました。次に、KAFKA PRODUCER API を使用して kafka トピックにメッセージを発行するために、Eclipse で Producer クラスを作成しようとしています。

シナリオ 1:

トピック テストと言うテストを使用してコンシューマー シェルが実行されている場合、プロデューサー クラスを実行します。公開されたすべてのメッセージを含むコンシューマー シェルを確認できます。

シナリオ2

コンシューマーシェルを開始していません (コンシューマーがダウンしているとします)。そして、プロデューサークラスを実行します。メッセージはカフカに公開されています。

メッセージが発行された場合、ダウンタイム後にコンシューマー シェルを起動すると、既に発行されたトピックのメッセージが読み取られません。

なんで?トピック消費のログを保持していると思います。メッセージを読むべきではありませんか?

言及する必要がある構成パラメーターはありますか?

Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("zk.connect", "localhost:2181");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");

ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);

        for ( int nEvents=0; nEvents<events;nEvents++)
        {
          String ip="192.168.2."+rnd.nextInt(255);
          String msg=getNextTradeData(); // Class to generate data
          KeyedMessage<String,String> data=new KeyedMessage<String, String>("TradeFrequency",ip,msg);
          Thread.sleep(100);
          System.out.println(msg);
          producer.send(data);  

        }
producer.close();

}

または、消費者を変えるために何かする必要がありますか。パッケージで提供されているコンシューマーシェルを使用しており、それを使用して開始しています

  bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic first-topic
4

1 に答える 1