14

Spark のスタンドアロン モードで ec2-script を使用して 10 ノードのクラスターを起動しました。PySpark シェル内から s3 バケットのデータにアクセスしていますが、RDD で変換を実行すると、1 つのノードしか使用されません。たとえば、以下は CommonCorpus からデータを読み込みます。

bucket = ("s3n://@aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2014-23/"
          "/segments/1404776400583.60/warc/CC-MAIN-20140707234000-00000-ip-10"
          "-180-212-248.ec2.internal.warc.gz")

data = sc.textFile(bucket)
data.count()

これを実行すると、10 個のスレーブのうち 1 つだけがデータを処理します。これは、1 つのスレーブ (213) だけが、Spark Web コンソールから表示したときにアクティビティのログを持っているためです。Ganglia でアクティビティを表示すると、この同じノード (213) が、アクティビティの実行時にメモリ使用量が急増した唯一のスレーブです。ここに画像の説明を入力

さらに、スレーブが 1 つだけの ec2 クラスターで同じスクリプトを実行すると、まったく同じパフォーマンスが得られます。私は Spark 1.1.0 を使用しています。ヘルプやアドバイスをいただければ幸いです。

4

1 に答える 1

18

...ec2.internal.warc.gz

並行してロードできないという点で、gzip ファイルでかなり典型的な問題に遭遇したと思います。より具体的には、単一の gzip ファイルを複数のタスクで並行してロードすることはできないため、Spark は 1 つのタスクでそれをロードし、1 つのパーティションを持つ RDD を提供します。

(ただし、Spark は 10 個の gzip ファイルを並列に正常にロードできることに注意してください。これらの 10 個のファイルのそれぞれは 1 つのタスクでしかロードできないということです。ファイル内ではなく、ファイル全体で並列処理を行うことができます。)

RDD 内のパーティションの数を明示的に確認することで、パーティションが 1 つしかないことを確認できます。

data.getNumPartitions()

RDD で並列に実行できるタスク数の上限は、RDD 内のパーティション数またはクラスター内のスレーブ コア数のいずれか少ない方です。

あなたの場合、RDD パーティションの数です。次のように RDD を再パーティション化することで、これを増やすことができます。

data = sc.textFile(bucket).repartition(sc.defaultParallelism * 3)

なぜsc.defaultParallelism * 3ですか?

Spark チューニング ガイドでは、コアあたり 2 ~ 3 個のタスクを使用することを推奨しており、クラスター内のコア数をsc.defaultParalellism示しています。

于 2014-12-24T04:58:20.450 に答える