0

Kafka + Storm + Trident + OpaqueTridentKafkaSpout でいくつかのパフォーマンスの問題が発生しています

以下に、セットアップの詳細を示します。

ストーム トポロジ:

Broker broker = Broker.fromString("localhost:9092")
    GlobalPartitionInformation info = new GlobalPartitionInformation()
    if(args[4]){
        int partitionCount = args[4].toInteger()
        for(int i =0;i<partitionCount;i++){
            info.addPartition(i, broker)
        }
    }
    StaticHosts hosts = new StaticHosts(info)
    TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(hosts,"test")
    tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme())


    OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(tridentKafkaConfig)
    TridentTopology topology = new TridentTopology()
    Stream st  = topology.newStream("spout1", kafkaSpout).parallelismHint(args[2].toInteger())
            .each(kafkaSpout.getOutputFields(), new NEO4JTridentFunction(), new Fields("status"))
            .parallelismHint(args[1].toInteger())
    Map conf = new HashMap()
    conf.put(Config.TOPOLOGY_WORKERS, args[3].toInteger())
    conf.put(Config.TOPOLOGY_DEBUG, false)

    if (args[0] == "local") {
        LocalCluster cluster = new LocalCluster()
        cluster.submitTopology("mytopology", conf, topology.build())
    } else {
        StormSubmitter.submitTopology("mytopology", conf, topology.build())
        NEO4JTridentFunction.getGraphDatabaseService().shutdown()
    }

Storm に使用している Storm.yaml は次のとおりです。

########### These MUST be filled in for a storm configuration
storm.zookeeper.servers:
     - "localhost"
#     - "server2"
# 
storm.zookeeper.port : 2999


storm.local.dir: "/opt/mphrx/neo4j/stormdatadir"

nimbus.childopts: "-Xms2048m"
ui.childopts: "-Xms1024m"
logviewer.childopts: "-Xmx512m"
supervisor.childopts: "-Xms1024m"
worker.childopts: "-Xms2600m -Xss256k -XX:MaxPermSize=128m -XX:PermSize=96m
    -XX:NewSize=1000m -XX:MaxNewSize=1000m -XX:MaxTenuringThreshold=1 -XX:SurvivorRatio=6
    -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled
    -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly
    -server -XX:+AggressiveOpts -XX:+UseCompressedOops -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true
    -Xloggc:logs/gc-worker-%ID%.log -verbose:gc
    -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1m
    -XX:+PrintGCDetails -XX:+PrintHeapAtGC -XX:+PrintGCTimeStamps -XX:+PrintClassHistogram
    -XX:+PrintTenuringDistribution -XX:-PrintGCApplicationStoppedTime -XX:-PrintGCApplicationConcurrentTime
    -XX:+PrintCommandLineFlags -XX:+PrintFlagsFinal"

java.library.path: "/usr/lib/jvm/jdk1.7.0_25"

supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703

topology.trident.batch.emit.interval.millis: 100
topology.message.timeout.secs: 300
#topology.max.spout.pending: 10000
  • Kafka で生成された各メッセージのサイズ: 11 KB
  • データを処理するための各ボルト(NEO4JTridentFunction)の実行時間: 500ms
  • ストーム ワーカーの数: 1
  • Spout(OpaqueTridentKafkaSpout) の並列処理のヒント: 1
  • Bolt/Function(NEO4JTridentFunction) の並列処理のヒント: 50

  • Spout から約 12msgs/秒のスループットが見られます。

  • Kafka で生成されるメッセージのレート: 150msgs/秒

Storm と Kafka はどちらも単一ノードの展開です。Storm からのはるかに高いスループットについて読みましたが、同じものを生成することはできません。より高いスループットを達成するために、Storm+ Kafka + OpaqueTridentKafkaSpout 構成を調整する方法を提案してください。この点での助けは、私たちを大いに助けてくれます。

ありがとう、

4

3 に答える 3

2

言及されたトピックのパーティション数と同じスパウト並列処理を設定する必要があります。デフォルトでは、トライデントは実行ごとに 1 つのバッチを受け入れtopology.max.spout.pendingます。プロパティを変更して、この数を増やす必要があります。Trident は順序付けられたトランザクション管理を強制するため、実行メソッド (NEO4JTridentFunction) は、目的のソリューションに到達するために高速である必要があります。

さらに、で遊ぶことができ"tridentConfig.fetchSizeBytes"ます。それを変更することで、スパウトでの新しい発行呼び出しごとに、より多くのデータを取り込むことができます。

また、ガベージ コレクションのログを確認する必要があります。これにより、実際のポイントについての手がかりが得られます。

"-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -verbose:gc -Xloggc:{path}/gc-storm-worker-%ID%.log"ワーカー構成の worker.childopts 設定に , を追加することで、ガベージ コレクション ログを有効にできます。

最後になりましたが、若い世代の比率が通常のケースよりも高い場合は、G1GC を使用できます。

于 2016-03-17T14:55:00.413 に答える
0

私の計算: ボルトあたり 8 コアと 500MS の場合 -> ~16 メッセージ/秒。 ボルトを最適化すると、改善が見られます。

また、CPU バウンド ボルトの場合は、Parallelism ヒント = '合計コア数' を試して、 topology.trident.batch.emit.interval.millisをバッチ全体の処理にかかる時間を 2 で割った値に増やします。topology.max を設定します。 spout.pending を 1 にします。

于 2016-03-19T18:25:31.080 に答える
0

システム構成に基づいて worker.childopts を設定してください。SpoutConfig.fetchSizeBytes を使用して、トポロジに取り込まれるバイト数を増やします。並列処理のヒントを増やします。

于 2016-01-08T15:23:01.207 に答える