私はこの単純なカフカストリームを持っています
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
// Each Kafka message is a flight
val flights = messages.map(_._2)
flights.foreachRDD( rdd => {
println("--- New RDD with " + rdd.partitions.length + " partitions and " + rdd.count() + " flight records");
rdd.map { flight => {
val flightRows = FlightParser.parse(flight)
println ("Parsed num rows: " + flightRows)
}
}
})
ssc.start()
ssc.awaitTermination()
Kafka にはメッセージがあり、Spark Streaming はそれらを RDD として取得できます。しかし、私のコードの 2 番目の println は何も出力しません。local[2] モードで実行したときはドライバー コンソール ログを調べ、yarn-client モードで実行したときは yarn ログを確認しました。
私は何が欠けていますか?
rdd.map の代わりに、次のコードが spark ドライバー コンソールに適切に出力されます。
for(flight <- rdd.collect().toArray) {
val flightRows = FlightParser.parse(flight)
println ("Parsed num rows: " + flightRows)
}
しかし、このフライト オブジェクトの処理は、executor ではなく、spark ドライバー プロジェクトで行われる可能性があります。私が間違っている場合は修正してください。
ありがとう