6

Spark でデータを処理していますが、1 日分のデータ (40G) で動作しますが、1 週間分のデータではOOMで失敗します。

import pyspark
import datetime
import operator
sc = pyspark.SparkContext()
sqc = pyspark.sql.SQLContext(sc)
sc.union([sqc.parquetFile(hour.strftime('.....'))
          .map(lambda row:(row.id, row.foo))
          for hour in myrange(beg,end,datetime.timedelta(0,3600))]) \
  .reduceByKey(operator.add).saveAsTextFile("myoutput")

異なる ID の数は 10k 未満です。各IDは小さめintです。OOM で失敗するエグゼキューターが多すぎるため、ジョブは失敗します。ジョブが (小さな入力で) 成功する"myoutput"と、約 100k になります。

  1. 私は何を間違っていますか?
  2. に置き換えsaveAsTextFileてみましたcollect(実際には、保存する前にPythonでスライスとダイシングを行いたいため)、動作に違いはなく、同じ失敗がありました。これは予想されることですか?
  3. 私はreduce(lambda x,y: x.union(y), [sqc.parquetFile(...)...])代わりに持っていましたsc.union- どちらが良いですか? 違いはありますか?

クラスターには、 825 GB のRAM と224のコアを備えた25のノードがあります。

呼び出しはspark-submit --master yarn --num-executors 50 --executor-memory 5G.

1 つの RDD には最大 140 の列があり、1 時間のデータをカバーするため、1 週間は 168(=7*24) の RDD の結合になります。

4

2 に答える 2

2

問題は火花ではなく、糸にあることが判明しました。解決策は、スパークを実行することです

spark-submit --conf spark.yarn.executor.memoryOverhead=1000

(またはヤーン構成を変更します)。

于 2015-03-27T14:05:57.110 に答える
2

Spark では、スケーリング時にメモリ不足エラーが頻繁に発生します。このような場合、微調整はプログラマが行う必要があります。または、コードを再確認して、設定したサイズに関係なく、memoryOverheadを超える可能性が非常に高い、ドライバー内のすべてのビッグデータを収集するなど、やりすぎないことを確認します。

何が起こっているのかを理解するには、がメモリ制限を超えたためにコンテナーを強制終了することを決定したときを理解する必要があります。これは、コンテナーがmemoryOverheadの制限を超えたときに発生します。

スケジューラでは、イベント タイムラインをチェックして、コンテナで何が起こったかを確認できます。Yarn がコンテナーを強制終了した場合、コンテナーは赤く表示され、ホバーまたはクリックすると、次のようなメッセージが表示されます。

メモリ制限を超えたためにコンテナーが YARN によって強制終了されました。16 GB の物理メモリのうち 16.9 GB を使用。spark.yarn.executor.memoryOverhead を増やすことを検討してください。

ここに画像の説明を入力


その場合、注目したいのは次の構成プロパティです (値は私のクラスターでの例です)。

# More executor memory overhead
spark.yarn.executor.memoryOverhead          4096

# More driver memory overhead
spark.yarn.driver.memoryOverhead            8192

# Max on my nodes
#spark.executor.cores                        8
#spark.executor.memory                       12G

# For the executors
spark.executor.cores                        6
spark.executor.memory                       8G

# For the driver
spark.driver.cores                          6
spark.driver.memory                         8G

最初に行うことは、 を増やすことmemoryOverheadです。

ドライバーまたはエグゼキューターで?

UI からクラスターの概要を確認している場合、試行 ID をクリックして、強制終了されたコンテナーの ID を示す診断情報を確認できます。AM Containerと同じ場合はドライバー、そうでない場合はエグゼキューターです。


それでも問題は解決しませんでした。

提供するコアの数とヒープ メモリを微調整する必要があります。pysparkはほとんどの作業をメモリで行うので、ヒープにあまり多くのスペースを与えたくないでしょう。無駄になるからです。ガベージ コレクターに問題が発生するため、与えすぎることは望ましくありません。これらは JVM であることを思い出してください。

hereで説明されているように、ワーカーは複数のエグゼキューターをホストできるため、使用されるコアの数はすべてのエグゼキューターが持つメモリの量に影響するため、#cores を減らすと役立つ場合があります。

Spark と Spark の memoryOverhead の問題に書いてあります– Container exited with a non-zero exit code 143詳細については、ほとんど忘れません! 私が試していない別のオプションは、spark.default.parallelism or/andspark.storage.memoryFractionであり、私の経験に基づいて、役に立ちませんでした。


sds に記載されているように、または次のように構成フラグを渡すことができます。

spark-submit --properties-file my_properties

「my_properties」は、上に挙げた属性のようなものです。

非数値の場合、これを行うことができます:

spark-submit --conf spark.executor.memory='4G' 
于 2016-09-15T20:24:56.080 に答える