次のコードがあるとします。
def f(x): (Array[Double], Array[Int])={
val data: Array[Double] //1D array
val index: Array[Int] //Data element's index
//Read data from a file into "data"
//Generate index (based on value "x") into "index"
(dset_datas, index)
}
sc.range(0, 10, 1, 10).flatMap.(x => (f(x)._1 zip f(x)._2))
質問:
1) 関数 f(x) は、フラットマップ内の x ごとに 2 回呼び出されますか? 最初に f(x)._1 を呼び出し、次に f(x)._2 を呼び出したので。
2) フラップマップ (特にデータ読み取り部分) は並行して実行されますか? 3 つのノードがあり、各ノードには 32 個のコアがあるとします。--num-executors=2 と --executor-cores=32 を設定しました。別のノードは、ドライバー ノードとして使用されます。
上記の質問に答えるために、Spark/Scala のドキュメントをたくさん検索しましたが、そこからは何の回答も得られませんでした。自分のシステムでコードを実行しようとしました。そのように見えます
1) データ パーティションが 2 回処理されることがわかったため、f(x) が 2 回呼び出されます。しかし、よくわかりません。
2) Spark ログ ファイル システムの下に 2 つの executor フォルダーが作成され、各 executor からの stdout もいくつかあることに気付きました。しかし、私もよくわかりません。
ありがとう !