Spark でデータを処理していますが、1 日分のデータ (40G) で動作しますが、1 週間分のデータではOOMで失敗します。
import pyspark
import datetime
import operator
sc = pyspark.SparkContext()
sqc = pyspark.sql.SQLContext(sc)
sc.union([sqc.parquetFile(hour.strftime('.....'))
.map(lambda row:(row.id, row.foo))
for hour in myrange(beg,end,datetime.timedelta(0,3600))]) \
.reduceByKey(operator.add).saveAsTextFile("myoutput")
異なる ID の数は 10k 未満です。各IDは小さめint
です。OOM で失敗するエグゼキューターが多すぎるため、ジョブは失敗します。ジョブが (小さな入力で) 成功する"myoutput"
と、約 100k になります。
- 私は何を間違っていますか?
- に置き換え
saveAsTextFile
てみましたcollect
(実際には、保存する前にPythonでスライスとダイシングを行いたいため)、動作に違いはなく、同じ失敗がありました。これは予想されることですか? - 私は
reduce(lambda x,y: x.union(y), [sqc.parquetFile(...)...])
代わりに持っていましたsc.union
- どちらが良いですか? 違いはありますか?
クラスターには、 825 GB のRAM と224のコアを備えた25のノードがあります。
呼び出しはspark-submit --master yarn --num-executors 50 --executor-memory 5G
.
1 つの RDD には最大 140 の列があり、1 時間のデータをカバーするため、1 週間は 168(=7*24) の RDD の結合になります。