23

PySpark でいくつかの操作を実行していますが、最近、構成 (Amazon EMR にある) のノード数を増やしました。ただし、ノード数を 3 倍 (4 から 12) に増やしても、パフォーマンスは変わらないようです。そのため、新しいノードが Spark に表示されるかどうかを確認したいと思います。

次の関数を呼び出しています。

sc.defaultParallelism
>>>> 2

しかし、これは、Spark が認識できるノードの総数ではなく、各ノードに分散されたタスクの総数を示していると思います。

PySpark がクラスターで使用しているノードの量を確認するにはどうすればよいですか?

4

5 に答える 5

19

sc.defaultParallelismは単なるヒントです。構成によっては、ノード数と関係がない場合があります。これは、パーティション カウント引数を受け取る操作を使用するが、それを指定しない場合のパーティションの数です。たとえばsc.parallelize、リストから新しい RDD を作成します。2 番目の引数を使用して、RDD に作成するパーティションの数を指定できます。ただし、この引数のデフォルト値は ですsc.defaultParallelism

Executor の数はsc.getExecutorMemoryStatusScala API で取得できますが、これは Python API では公開されていません。

一般に、RDD にはエグゼキューターの約 4 倍のパーティションを用意することをお勧めします。これは良いヒントです。なぜなら、タスクにかかる時間にばらつきがある場合は、これで均等になるからです。たとえば、一部のエグゼキュータは 5 つの高速タスクを処理し、他のエグゼキュータは 3 つの低速タスクを処理します。

これについては、あまり正確である必要はありません。大まかなアイデアがあれば、見積もりで行くことができます。たとえば、CPU が 200 個未満であることがわかっている場合、500 個のパーティションで問題ないと言えます。

したがって、この数のパーティションで RDD を作成してみてください。

rdd = sc.parallelize(data, 500)     # If distributing local data.
rdd = sc.textFile('file.csv', 500)  # If loading data from a file.

または、RDD の作成を制御しない場合は、計算の前に RDD を再分割します。

rdd = rdd.repartition(500)

を使用して、RDD 内のパーティションの数を確認できますrdd.getNumPartitions()

于 2015-03-01T12:15:25.533 に答える
1

リモートによってセッションが強制終了され、奇妙な Java エラーが発生することがありました。

Py4JJavaError: An error occurred while calling o349.defaultMinPartitions.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.

私は次の方法でこれを回避しました

def check_alive(spark_conn):
    """Check if connection is alive. ``True`` if alive, ``False`` if not"""
    try:
        get_java_obj = spark_conn._jsc.sc().getExecutorMemoryStatus()
        return True
    except Exception:
        return False

def get_number_of_executors(spark_conn):
    if not check_alive(spark_conn):
        raise Exception('Unexpected Error: Spark Session has been killed')
    try:
        return spark_conn._jsc.sc().getExecutorMemoryStatus().size()
    except:
        raise Exception('Unknown error')
于 2017-07-04T18:24:57.600 に答える