kafka-storm を統合しようとしていました。いくつかの例から始めました。
GitHub からサンプルを実行できました。次に、KAFKA PRODUCER API を使用して kafka トピックにメッセージを発行するために、Eclipse で Producer クラスを作成しようとしています。
シナリオ 1:
トピック テストと言うテストを使用してコンシューマー シェルが実行されている場合、プロデューサー クラスを実行します。公開されたすべてのメッセージを含むコンシューマー シェルを確認できます。
シナリオ2
コンシューマーシェルを開始していません (コンシューマーがダウンしているとします)。そして、プロデューサークラスを実行します。メッセージはカフカに公開されています。
メッセージが発行された場合、ダウンタイム後にコンシューマー シェルを起動すると、既に発行されたトピックのメッセージが読み取られません。
なんで?トピック消費のログを保持していると思います。メッセージを読むべきではありませんか?
言及する必要がある構成パラメーターはありますか?
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("zk.connect", "localhost:2181");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
for ( int nEvents=0; nEvents<events;nEvents++)
{
String ip="192.168.2."+rnd.nextInt(255);
String msg=getNextTradeData(); // Class to generate data
KeyedMessage<String,String> data=new KeyedMessage<String, String>("TradeFrequency",ip,msg);
Thread.sleep(100);
System.out.println(msg);
producer.send(data);
}
producer.close();
}
または、消費者を変えるために何かする必要がありますか。パッケージで提供されているコンシューマーシェルを使用しており、それを使用して開始しています
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic first-topic