1

Kafka を Storm と統合しようとしています。Kafka Spout を使用して Kafka トピックからデータを取得し、それをさらに処理するために Storm Bolt に供給しています。トポロジーを正常に送信できますが、Spout はデータを出力していません。エラーもスローしません。私は Kafka と Storm に非常に慣れていないため、この問題の背後にある理由を理解できません。変更を提案してください。事前に感謝します!!

トポロジを送信した後の Storm UI のスクリーン ショット

私のトポロジ:

public class TopologyMain {

 private static final String SENTENCE_SPOUT_ID = "kafka-sentence-spout";


public static void main(String[] args) throws InterruptedException, AlreadyAliveException, InvalidTopologyException {
    int numSpoutExecutors = 1;


    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout(SENTENCE_SPOUT_ID, buildKafkaSentenceSpout(), numSpoutExecutors);
    builder.setBolt("word-normalizer", new WordNormalizer())
        .shuffleGrouping(SENTENCE_SPOUT_ID);
    builder.setBolt("word-counter", new WordCounter(),2)
        .shuffleGrouping("word-normalizer");

    //Configuration
    Config conf = new Config();
    conf.setDebug(false);
    //Topology run
    conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
    conf.put(Config.NIMBUS_HOST, "192.168.1.229");
    conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
    System.setProperty("storm.jar", "/home/ubuntu/st/stIn/target/storm-wc.jar");
    StormSubmitter.submitTopology("Count-Word-Topology", conf,builder.createTopology());

}



 private static KafkaSpout buildKafkaSentenceSpout() {
      BrokerHosts hosts = new ZkHosts("localhost:2181");
      SpoutConfig spoutConfig = new SpoutConfig(hosts, "test", "/acking-kafka-sentence-spout", "acking-sentence-spout");
      spoutConfig.forceFromStart = true;
      spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
      return new KafkaSpout(spoutConfig);
    }
 }
4

1 に答える 1