2

私はこの単純なカフカストリームを持っています

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

// Each Kafka message is a flight
val flights = messages.map(_._2)

flights.foreachRDD( rdd => {
  println("--- New RDD with " + rdd.partitions.length + " partitions and " + rdd.count() + " flight records");
  rdd.map { flight => {        
    val flightRows = FlightParser.parse(flight)
    println ("Parsed num rows: " + flightRows)
    }
  }          
})

ssc.start()
ssc.awaitTermination()

Kafka にはメッセージがあり、Spark Streaming はそれらを RDD として取得できます。しかし、私のコードの 2 番目の println は何も出力しません。local[2] モードで実行したときはドライバー コンソール ログを調べ、yarn-client モードで実行したときは yarn ログを確認しました。

私は何が欠けていますか?

rdd.map の代わりに、次のコードが spark ドライバー コンソールに適切に出力されます。

for(flight <- rdd.collect().toArray) {
     val flightRows = FlightParser.parse(flight)
     println ("Parsed num rows: " + flightRows)
}

しかし、このフライト オブジェクトの処理は、executor ではなく、spark ドライバー プロジェクトで行われる可能性があります。私が間違っている場合は修正してください。

ありがとう

4

1 に答える 1

2

rdd.mapは遅延変換です。そのRDDでアクションが呼び出されない限り、具体化されません。
この特定のケースではrdd.foreach 、RDD の最も一般的なアクションの 1 つである which を使用して、RDD の各要素にアクセスできます。

flights.foreachRDD{ rdd => 
    rdd.foreach { flight =>        
        val flightRows = FlightParser.parse(flight)
        println ("Parsed num rows: " + flightRows) // prints on the stdout of each executor independently
    }
}

この RDD アクションがエグゼキュータで実行される場合、エグゼキュータの STDOUT に println 出力が見つかります。

代わりにドライバーにデータを出力したい場合は、クロージャーcollect内で RDD のデータを出力できます。DStream.foreachRDD

flights.foreachRDD{ rdd => 
  val allFlights = rdd.collect() 
  println(allFlights.mkString("\n")) // prints to the stdout of the driver
}
于 2016-04-12T10:29:05.097 に答える