2

各マシンに 9 コア / 80G (合計 27 コア / 240G RAM) を備えたローカル マシンにスタンドアロンの Spark (2.1.1) クラスターを作成しました。

1 から x までのすべての数値を合計するサンプル Spark ジョブがあります。これがコードです。

package com.example

import org.apache.spark.sql.SparkSession

object ExampleMain {

    def main(args: Array[String]): Unit = {
      val spark = SparkSession.builder
          .master("spark://192.168.1.2:7077")
          .config("spark.driver.maxResultSize" ,"3g")
          .appName("ExampleApp")
          .getOrCreate()
      val sc = spark.SparkContext
      val rdd = sc.parallelize(Lisst.range(1, 1000))
      val sum = rdd.reduce((a,b) => a+b)
      println(sum)
      done
    }

    def done = {
      println("\n\n")
      println("-------- DONE --------")
    }
}

上記のコードを実行すると、数秒後に結果が得られるので、コードをクランチして、1 から 1B (1,000,000,000) までのすべての数値を合計すると、GC のオーバーヘッド制限に達します。

十分なメモリがない場合、スパークはメモリを HDD にスピルする必要があることを読みました。クラスター構成で遊んでみましたが、役に立ちませんでした。

Driver memory = 6G
Number of workers = 24
Cores per worker = 1
Memory per worker = 10

私は開発者ではなく、Scala の知識もありませんが、GC の問題なしでこのコードを実行するための解決策を見つけたいと考えています。

@philantrovert リクエストごとに、spark-submit コマンドを追加しています

/opt/spark-2.1.1/bin/spark-submit \
--class "com.example.ExampleMain" \
--master spark://192.168.1.2:6066 \
--deploy-mode cluster \
/mnt/spark-share/example_2.11-1.0.jar

さらに、私のspark/confは次のとおりです:

  • スレーブ ファイルには、ノード (マスターを含む) の 3 つの IP アドレスが含まれています。
  • spark-defaults には以下が含まれます。
    • spark.master spark://192.168.1.2:7077
    • spark.driver.memory 10g
  • spark-env.sh には以下が含まれます。
    • SPARK_LOCAL_DIRS= 全ノード間の共有フォルダー
    • SPARK_EXECUTOR_MEMORY=10G
    • SPARK_DRIVER_MEMORY=10G
    • SPARK_WORKER_CORES=1
    • SPARK_WORKER_MEMORY=10G
    • SPARK_WORKER_INSTANCES=8
    • SPARK_WORKER_DIR= 全ノード間の共有フォルダー
    • SPARK_WORKER_OPTS="-Dspark.worker.cleanup.enabled=true"

ありがとう

4

1 に答える 1