Storm 1.1.2 と Kafka 0.11 を使用して、Docker コンテナーで起動する Java Spring アプリケーションを構築しています。
トポロジのすべてが計画どおりに機能しますが、Kafka からの負荷が高いと、時間の経過とともに Kafka の遅延がますます大きくなります。
私の KafkaSpoutConfig:
KafkaSpoutConfig<String,String> spoutConf =
KafkaSpoutConfig.builder("kafkaContainerName:9092", "myTopic")
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "myGroup")
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyObjectDeserializer.class)
.build()
次に、私のトポロジは次のとおりです
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("stormKafkaSpout", new KafkaSpout<String,String>(spoutConf), 25);
builder.setBolt("routerBolt", new RouterBolt(),25).shuffleGrouping("stormKafkaSpout");
Config conf = new Config();
conf.setNumWorkers(10);
conf.put(Config.STORM_ZOOKEEPER_SERVERS, ImmutableList.of("zookeeper"));
conf.put(Config.STORM_ZOOKEEPER_PORT, 2181);
conf.put(Config.NIMBUS_SEEDS, ImmutableList.of("nimbus"));
conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
System.setProperty("storm.jar", "/opt/storm.jar");
StormSubmitter.submitTopology("topologyId", conf, builder.createTopology());
RouterBolt (BaseRichBolt を拡張) は、1 つの非常に単純な switch ステートメントを実行し、ローカルの KafkaProducer オブジェクトを使用して新しいメッセージを別のトピックに送信します。私が言ったように、すべてがコンパイルされ、トポロジは期待どおりに実行されますが、高負荷 (3000 メッセージ/秒) では、Kafka の遅延が積み重なり、トポロジのスループットが低下します。
ackを無効にしてみました
conf.setNumAckers(0);
と
conf.put(Config.TOPOLGY_ACKER_EXECUTORS, 0);
しかし、それは問題ではないと思います。
Storm UI で、RouterBolt の実行レイテンシが 1.2 ミリ秒で、高負荷時のプロセス レイテンシが 0.03 ミリ秒であることを確認しました。これにより、Spout がボトルネックであると思われます。また、25 個のパーティションがあるため、並列処理のヒントは 25 です。 「マイトピック」。ありがとう!