1

Kafka から継続的にデータを受信できるステートフルな wordCount スパーク ストリーミング アプリケーションを作成します。私のコードにはmapWithState関数が含まれており、正しく実行できます。Spark UI でストリーミング統計を確認すると、処理時間チャートに周期的なパルスがいくつか見つかりました。これはcheckpointの使用が原因である可能性があると思います。誰かがこれを説明してくれることを願っています。

ストリーミング統計

および完成したバッチ テーブル:

バッチ処理時間

時間コストが 1 秒のバッチが定期的に発生することがわかりました。次に、1 秒時間コストのバッチと 1 秒未満の時間コスト バッチに足を踏み入れたところ、1 秒時間コストのバッチには 1 つ多くのジョブがあり、もう 1 つのジョブがあることがわかりました。

2 種類のバッチの比較: 1 秒時間コストのバッチ 1 秒未満のコストのバッチ

が原因のようcheckpointですが、よくわかりません。

誰か詳しく説明してくれませんか? ありがとう!

これが私のコードです:

import kafka.serializer.StringDecoder 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.kafka._ 
import org.apache.spark.SparkConf 

object StateApp {

  def main(args: Array[String]) {

    if (args.length < 4) {
      System.err.println(
        s"""
           |Usage: KafkaSpark_008_test <brokers> <topics> <batchDuration>
           |  <brokers> is a list of one or more Kafka brokers
           |  <topics> is a list of one or more kafka topics to consume from
           |  <batchDuration> is the batch duration of spark streaming
           |  <checkpointPath> is the checkpoint directory
        """.stripMargin)
      System.exit(1)
    }

    val Array(brokers, topics, bd, cpp) = args

    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("KafkaSpark_080_test")
    val ssc = new StreamingContext(sparkConf, Seconds(bd.toInt))

    ssc.checkpoint(cpp)

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    // test the messages' receiving speed
    messages.foreachRDD(rdd =>
      println(System.currentTimeMillis() + "\t" + System.currentTimeMillis() / 1000 + "\t" + (rdd.count() / bd.toInt).toString))

    // the messages' value type is "timestamp port word", eg. "1479700000000 10105 ABC"
    // wordDstream: (word, 1), eg. (ABC, 1)
    val wordDstream = messages.map(_._2).map(msg => (msg.split(" ")(2), 1))

    // this is from Spark Source Code example in Streaming/StatefulNetworkWordCount.scala
    val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
      val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
      val output = (word, sum)
      state.update(sum)
      output
    }

    val stateDstream = wordDstream.mapWithState(
      StateSpec.function(mappingFunc)).print()

    // Start the computation
    ssc.start()
    ssc.awaitTermination()   }

}
4

1 に答える 1

1

これらの小さなスパイクは、データを永続ストレージにチェックポイントすることによって発生します。Spark が状態の完全な変換を行うには、定義された間隔ごとにデータを確実に保存して、障害が発生した場合に回復できるようにする必要があります。

スパイクは 50 秒ごとに実行されるため、時間的に一貫していることに注意してください。この計算は次のとおりです: ( batch time * default multiplier)、現在のデフォルトの乗数は 10 です。あなたの場合、これは5 * 10 = 50スパイクが 50 秒ごとに表示される理由を説明しています。

于 2016-12-05T07:38:09.573 に答える