4

rdds からの結果を配列に収集して送り返すドライバープログラムにこの関数があります。ただし、RDD (dstream 内) にはデータがありますが、関数は空の配列を返しています...何が間違っていますか?

def runTopFunction() : Array[(String, Int)] = {
        val topSearches = some function....
        val summary = new ArrayBuffer[(String,Int)]()
        topSearches.foreachRDD(rdd => {
            summary = summary.++(rdd.collect())
        })    

    return summary.toArray
}
4

2 に答える 2

1

DStream.forEachRDD指定されたアクションでDStreamあり、各ストリーミング バッチ間隔で実行されるようにスケジュールされます。後で実行されるジョブの宣言的な構造です。

Dstream.forEachRDD が「反復ごとにこれを行う」と言っているだけで、周囲の累積コードがすぐに実行され、空の配列になるため、この方法での値の累積はサポートされていません。

計算後にデータに何が起こるかに応じて、summaryこれを実装する方法にはいくつかのオプションがあります。

  • データを別のプロセスで取得する必要がある場合は、スレッドセーフな共有構造を使用してください。プライオリティ キューは、上位 k の用途に最適です。
  • データが保存される場合 (fs、db)、topSearches関数を dstream に適用した後、ストレージに書き込むことができます。
于 2015-02-27T10:10:00.957 に答える
1

したがって、foreachRDDは目的の処理を実行しますが、ノンブロッキングでもあります。つまり、すべてのストリームが処理されるまで待機しません。toArrayへの呼び出しの直後にバッファーを呼び出しているためforeachRDD、まだ処理された要素はありません。

于 2015-02-26T00:46:37.133 に答える