2

Storm 1.1.2 と Kafka 0.11 を使用して、Docker コンテナーで起動する Java Spring アプリケーションを構築しています。

トポロジのすべてが計画どおりに機能しますが、Kafka からの負荷が高いと、時間の経過とともに Kafka の遅延がますます大きくなります。

私の KafkaSpoutConfig:

 KafkaSpoutConfig<String,String> spoutConf = 
     KafkaSpoutConfig.builder("kafkaContainerName:9092", "myTopic")
     .setProp(ConsumerConfig.GROUP_ID_CONFIG, "myGroup")
     .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyObjectDeserializer.class)
     .build()

次に、私のトポロジは次のとおりです

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("stormKafkaSpout", new KafkaSpout<String,String>(spoutConf), 25);

builder.setBolt("routerBolt", new RouterBolt(),25).shuffleGrouping("stormKafkaSpout");

Config conf = new Config();
conf.setNumWorkers(10);
conf.put(Config.STORM_ZOOKEEPER_SERVERS, ImmutableList.of("zookeeper"));
conf.put(Config.STORM_ZOOKEEPER_PORT, 2181);

conf.put(Config.NIMBUS_SEEDS, ImmutableList.of("nimbus"));
conf.put(Config.NIMBUS_THRIFT_PORT, 6627);

System.setProperty("storm.jar", "/opt/storm.jar");

StormSubmitter.submitTopology("topologyId", conf, builder.createTopology());

RouterBolt (BaseRichBolt を拡張) は、1 つの非常に単純な switch ステートメントを実行し、ローカルの KafkaProducer オブジェクトを使用して新しいメッセージを別のトピックに送信します。私が言ったように、すべてがコンパイルされ、トポロジは期待どおりに実行されますが、高負荷 (3000 メッセージ/秒) では、Kafka の遅延が積み重なり、トポロジのスループットが低下します。

ackを無効にしてみました

conf.setNumAckers(0);

conf.put(Config.TOPOLGY_ACKER_EXECUTORS, 0);

しかし、それは問題ではないと思います。

Storm UI で、RouterBolt の実行レイテンシが 1.2 ミリ秒で、高負荷時のプロセス レイテンシが 0.03 ミリ秒であることを確認しました。これにより、Spout がボトルネックであると思われます。また、25 個のパーティションがあるため、並列処理のヒントは 25 です。 「マイトピック」。ありがとう!

4

1 に答える 1