カスタム レシーバーを使用して Spark ストリーミング アプリケーションを作成しようとしています。事前定義された間隔でランダムな値を提供することにより、リアルタイムの入力データをシミュレートすることになっています。(単純化された) レシーバーは次のようになります。Spark Streaming アプリのコードは次のとおりです。
class SparkStreamingReceiver extends Actor with ActorHelper {
private val random = new Random()
override def preStart = {
context.system.scheduler.schedule(500 milliseconds, 1000 milliseconds)({
self ! ("string", random.nextGaussian())
})
}
override def receive = {
case data: (String, Double) => {
store[(String, Double)](data)
}
}
}
val conf: SparkConf = new SparkConf()
conf.setAppName("Spark Streaming App")
.setMaster("local")
val ssc: StreamingContext = new StreamingContext(conf, Seconds(2))
val randomValues: ReceiverInputDStream[(String, Double)] =
ssc.actorStream[(String,Double)](Props(new SparkStreamingReceiver()), "Receiver")
randomValues.saveAsTextFiles("<<OUTPUT_PATH>>/randomValues")
このコードを実行すると、レシーバーが機能していることがわかります (アイテムの保存、単一のログ エントリの受信)。ただし、saveAsTextFiles
値を出力することはありません。
マスターを 2 つのスレッドで実行するように変更することで問題を回避できます ( local[2]
) が、レシーバーの別のインスタンスを登録すると (これを行う予定です)、再び表示されます。より具体的には、出力を取得するには、登録されているカスタム レシーバーの数よりも多くのスレッドが少なくとも 1 つ必要です。
ワーカースレッドがレシーバーによって停止されているように思えます。
誰でもこの効果を説明できますか?おそらく私のコードを修正する方法はありますか?