Kafka を Storm と統合しようとしています。Kafka Spout を使用して Kafka トピックからデータを取得し、それをさらに処理するために Storm Bolt に供給しています。トポロジーを正常に送信できますが、Spout はデータを出力していません。エラーもスローしません。私は Kafka と Storm に非常に慣れていないため、この問題の背後にある理由を理解できません。変更を提案してください。事前に感謝します!!
私のトポロジ:
public class TopologyMain {
private static final String SENTENCE_SPOUT_ID = "kafka-sentence-spout";
public static void main(String[] args) throws InterruptedException, AlreadyAliveException, InvalidTopologyException {
int numSpoutExecutors = 1;
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SENTENCE_SPOUT_ID, buildKafkaSentenceSpout(), numSpoutExecutors);
builder.setBolt("word-normalizer", new WordNormalizer())
.shuffleGrouping(SENTENCE_SPOUT_ID);
builder.setBolt("word-counter", new WordCounter(),2)
.shuffleGrouping("word-normalizer");
//Configuration
Config conf = new Config();
conf.setDebug(false);
//Topology run
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
conf.put(Config.NIMBUS_HOST, "192.168.1.229");
conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
System.setProperty("storm.jar", "/home/ubuntu/st/stIn/target/storm-wc.jar");
StormSubmitter.submitTopology("Count-Word-Topology", conf,builder.createTopology());
}
private static KafkaSpout buildKafkaSentenceSpout() {
BrokerHosts hosts = new ZkHosts("localhost:2181");
SpoutConfig spoutConfig = new SpoutConfig(hosts, "test", "/acking-kafka-sentence-spout", "acking-sentence-spout");
spoutConfig.forceFromStart = true;
spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
return new KafkaSpout(spoutConfig);
}
}