各マシンに 9 コア / 80G (合計 27 コア / 240G RAM) を備えたローカル マシンにスタンドアロンの Spark (2.1.1) クラスターを作成しました。
1 から x までのすべての数値を合計するサンプル Spark ジョブがあります。これがコードです。
package com.example
import org.apache.spark.sql.SparkSession
object ExampleMain {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.master("spark://192.168.1.2:7077")
.config("spark.driver.maxResultSize" ,"3g")
.appName("ExampleApp")
.getOrCreate()
val sc = spark.SparkContext
val rdd = sc.parallelize(Lisst.range(1, 1000))
val sum = rdd.reduce((a,b) => a+b)
println(sum)
done
}
def done = {
println("\n\n")
println("-------- DONE --------")
}
}
上記のコードを実行すると、数秒後に結果が得られるので、コードをクランチして、1 から 1B (1,000,000,000) までのすべての数値を合計すると、GC のオーバーヘッド制限に達します。
十分なメモリがない場合、スパークはメモリを HDD にスピルする必要があることを読みました。クラスター構成で遊んでみましたが、役に立ちませんでした。
Driver memory = 6G
Number of workers = 24
Cores per worker = 1
Memory per worker = 10
私は開発者ではなく、Scala の知識もありませんが、GC の問題なしでこのコードを実行するための解決策を見つけたいと考えています。
@philantrovert リクエストごとに、spark-submit コマンドを追加しています
/opt/spark-2.1.1/bin/spark-submit \
--class "com.example.ExampleMain" \
--master spark://192.168.1.2:6066 \
--deploy-mode cluster \
/mnt/spark-share/example_2.11-1.0.jar
さらに、私のspark/confは次のとおりです:
- スレーブ ファイルには、ノード (マスターを含む) の 3 つの IP アドレスが含まれています。
- spark-defaults には以下が含まれます。
- spark.master spark://192.168.1.2:7077
- spark.driver.memory 10g
- spark-env.sh には以下が含まれます。
- SPARK_LOCAL_DIRS= 全ノード間の共有フォルダー
- SPARK_EXECUTOR_MEMORY=10G
- SPARK_DRIVER_MEMORY=10G
- SPARK_WORKER_CORES=1
- SPARK_WORKER_MEMORY=10G
- SPARK_WORKER_INSTANCES=8
- SPARK_WORKER_DIR= 全ノード間の共有フォルダー
- SPARK_WORKER_OPTS="-Dspark.worker.cleanup.enabled=true"
ありがとう