0

次のコードがあるとします。

def f(x): (Array[Double], Array[Int])={
    val   data: Array[Double]   //1D array
    val   index: Array[Int]    //Data element's index

    //Read data from a file into "data"
    //Generate index (based on value "x") into "index"

    (dset_datas, index)
}

sc.range(0, 10, 1, 10).flatMap.(x => (f(x)._1 zip f(x)._2))

質問:

1) 関数 f(x) は、フラットマップ内の x ごとに 2 回呼び出されますか? 最初に f(x)._1 を呼び出し、次に f(x)._2 を呼び出したので。

2) フラップマップ (特にデータ読み取り部分) は並行して実行されますか? 3 つのノードがあり、各ノードには 32 個のコアがあるとします。--num-executors=2 と --executor-cores=32 を設定しました。別のノードは、ドライバー ノードとして使用されます。

上記の質問に答えるために、Spark/Scala のドキュメントをたくさん検索しましたが、そこからは何の回答も得られませんでした。自分のシステムでコードを実行しようとしました。そのように見えます

1) データ パーティションが 2 回処理されることがわかったため、f(x) が 2 回呼び出されます。しかし、よくわかりません。

2) Spark ログ ファイル システムの下に 2 つの executor フォルダーが作成され、各 executor からの stdout もいくつかあることに気付きました。しかし、私もよくわかりません。

ありがとう !

4

1 に答える 1

0

1)関数リテラルで2回呼び出されるため、すべてのワーカーはf(x)2回実行されます-結果のタプルの異なる要素を抽出するたびに。

range2)メソッドの最後のパラメーターはです10。これは、範囲 RDD に 10 個のパーティションがあることを意味します。これは、その並列実行の上限flatMapが 10 であることを意味します (10 個のエグゼキューターがある場合flatMap、それらのエグゼキューターごとに並列実行できます)。2 つのエグゼキューターがあるためflatMap、並列で実行されますが、それらの 2 つのエグゼキューターでのみ実行されます。

于 2016-06-03T19:27:35.003 に答える