3

Google クラウド ストレージから読み取った大量のデータ ( 2TB ) を処理するジョブを Yarn モードで実行しようとしています。

パイプラインは次のように要約できます。

sc.textFile("gs://path/*.json")\
.map(lambda row: json.loads(row))\
.map(toKvPair)\
.groupByKey().take(10)

 [...] later processing on collections and output to GCS.
  This computation over the elements of collections is not associative,
  each element is sorted in it's keyspace.

10GBのデータで実行すると、問題なく完了します。ただし、完全なデータセットで実行すると、コンテナー内の次のログで常に失敗します。

15/11/04 16:08:07 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster has disassociated: xxxxxxxxxxx
15/11/04 16:08:07 ERROR org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend: Yarn application has already exited with state FINISHED!
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/spark/python/pyspark/rdd.py", line 1299, in take
    res = self.context.runJob(self, takeUpToNumLeft, p)
  File "/usr/lib/spark/python/pyspark/context.py", line 916, in runJob
15/11/04 16:08:07 WARN org.apache.spark.ExecutorAllocationManager: No stages are running, but numRunningTasks != 0
    port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
  File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 36, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job cancelled because SparkContext was shut down

マスターに接続して各操作を1つずつ起動して調べてみたところ、groupByで失敗するようです。また、ノードを追加してメモリと CPU の数をアップグレードすることでクラスターの再スケーリングを試みましたが、それでも同じ問題が発生します。

同じ仕様の 120 ノード + 1 マスター: 8 vCPU - 52 GB メモリ

同様の問題を抱えたスレッドを見つけようとしましたが成功しませんでした。ログがあまり明確ではないため、どのような情報を提供すればよいかわかりません。詳細についてはお気軽にお問い合わせください。

主キーはすべてのレコードに必要な値であり、フィルタなしのすべてのキーが必要です。これは約 60 万キーを表します。クラスターを大規模なものにスケーリングせずに、この操作を実行することは本当に可能ですか? 私は、databricks が 100 TB のデータ ( https://databricks.com/blog/2014/10/10/spark-petabyte-sort.html ) で並べ替えを行ったことを読みましたが、これには大規模なシャッフルも含まれます。複数のインメモリ バッファを 1 つのバッファに置き換えることで成功し、多くのディスク IO が発生しました。私のクラスター規模でそのような操作を実行することは可能ですか?

4

1 に答える 1

2

元の質問に対するコメントから学んだことを要約すると、小さなデータセット (特に、1 台のマシンの合計メモリに収まる可能性があるデータセット) が機能し、クラスターに大幅に多くのノードを追加したにもかかわらず、大きなデータセットが失敗した場合、あらゆる使用法と組み合わされます。のgroupByKey場合、最も一般的に確認することは、データのキーごとのレコード数に大きな不均衡があるかどうかです。

特に、groupByKey今日の時点でも、単一のキーのすべての値を同じマシンにシャッフルする必要があるだけでなく、すべてがメモリに収まる必要があるという制約があります。

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L503

/**
 * Group the values for each key in the RDD into a single sequence. Allows controlling the
 * partitioning of the resulting key-value pair RDD by passing a Partitioner.
 * The ordering of elements within each group is not guaranteed, and may even differ
 * each time the resulting RDD is evaluated.
 *
 * Note: This operation may be very expensive. If you are grouping in order to perform an
 * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
 * or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
 *
 * Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
 * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
 */

この問題については、回避策の議論を含むメーリング リストの議論を指し示すさらなる議論があります。つまり、値/レコードのハッシュ文字列をキーに明示的に追加し、バケットの小さなセットにハッシュして、大きなグループを手動で分割できる場合があります。

あなたの場合.map、既知のホットキーのキーを条件付きで調整してサブグループに分割し、非ホットキーを変更しないままにする最初の変換を行うこともできます。

一般に、「インメモリ」制約は、ノードを追加しても大幅に歪んだキーを実際に回避できないことを意味します。これは、ホット ノードで「インプレース」スケーリングする必要があるためです。特定のケースでは、最大キーの値がすべてその 30g に収まる限りspark.executor.memory--confまたは dataprocで設定できる場合があります(ヘッドルーム/オーバーヘッドも多少あります)。gcloud beta dataproc jobs submit spark [other flags] --properties spark.executor.memory=30gしかし、それは利用可能な最大のマシンで最大になるため、データセット全体が大きくなると最大キーのサイズが大きくなる可能性がある場合は、単一の実行プログラムのメモリを増やすよりも、キーの配布自体を変更することをお勧めします。 .

于 2015-11-05T00:41:41.667 に答える