私はスパークするのが初めてです。ハイブクエリの下にあり、その上で、pythonでsparkを使用してピボット操作を実行しています。
以下の pyspark スクリプトは、いくつかのピボット操作を行い、ハイブ テーブルに書き込みます。Hive クエリは 1 億 4000 万行を返します。
アプローチ1
from pyspark import SparkContext
from pyspark import HiveContext
from pyspark.sql import functions as F
sc = SparkContext()
hc = HiveContext(sc)
tbl = hc.sql("""
Select Rating.BranchID
, Rating.Vehicle
, Rating.PersonalAutoCov
, Rating.PersonalVehicleCov
, Rating.EffectiveDate
, Rating.ExpirationDate
, attr.name as RatingAttributeName
, Cast(Rating.OutputValue as Int) OutputValue
, Rating.InputValue
From db.dbo_pcx_paratingdata_piext_master rating
Inner Join db.dbo_pctl_ratingattrname_piext_master attr
on rating.RatingAttribute = attr.id
and attr.CurrentRecordIndicator = 'Y'
Where
rating.CurrentRecordIndicator = 'Y'
""")
tbl.cache()
pvttbl1 = tbl.groupby("BranchId","Vehicle","PersonalAutoCov","PersonalVehicleCov","EffectiveDate","ExpirationDate")\
.pivot("RatingAttributeName")\
.agg({"InputValue":"max", "OutputValue":"sum"})
pvttbl1.createOrReplaceTempView("paRatingAttributes")
hc.sql("Create table dev_pekindataaccesslayer.createcount as select * from paRatingAttributes")
上記のスクリプトを spark-submit コマンドで実行しているとき、最終的には
java.lang.OutOfMemoryError: Java ヒープ領域
または何度か
java.lang.OutOfMemoryError: GC オーバーヘッドの制限を超えました
私が使用したspark-submitコマンド。
spark-submit spark_ex2.py --master yarn-cluster --num-executors 15 --executor-cores 50 --executor-memory 100g --driver-memory 100g, --conf `"spark.sql.shuffle.partitions=1000", --conf "spark.memory.offHeap.enabled=true", --conf "spark.memory.offHeap.size=100g",--conf "spark.network.timeout =1200", --conf "spark.executor.heartbeatInterval=1201"`
詳細なログ:
INFO MemoryStore: Memory use = 1480.9 KB (blocks) + 364.8 MB (scratch space shared across 40 tasks(s)) = 366.2 MB.
Storage limit = 366.3 MB.
WARN BlockManager: Persisting block rdd_11_22 to disk instead.
WARN BlockManager: Putting block rdd_11_0 failed due to an exception
WARN BlockManager: Block rdd_11_0 could not be removed as it was not found on disk or in memory
ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 10)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
上記のpysparkスクリプトに少し変更を加えましたが、問題なく動作します
アプローチ 2
from pyspark import SparkContext
from pyspark import HiveContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
sc = SparkContext()
hc = HiveContext(sc)
sqlContext = SQLContext(sc)
tbl = hc.sql("""
Select Rating.BranchID
, Rating.Vehicle
, Rating.PersonalAutoCov
, Rating.PersonalVehicleCov
, Rating.EffectiveDate
, Rating.ExpirationDate
, attr.name as RatingAttributeName
, Cast(Rating.OutputValue as Int) OutputValue
, Rating.InputValue
From db.dbo_pcx_paratingdata_piext_master rating
Inner Join db.dbo_pctl_ratingattrname_piext_master attr
on rating.RatingAttribute = attr.id
and attr.CurrentRecordIndicator = 'Y'
Where
rating.CurrentRecordIndicator = 'Y'
""")
tbl.createOrReplaceTempView("Ptable")
r=sqlContext.sql("select count(1) from Ptable")
m=r.collect()[0][0]
hc.sql("drop table if exists db.Ptable")
hc.sql("Create table db.Ptable as select * from Ptable")
tb2 = hc.sql("select * from db.Ptable limit "+str(m))
pvttbl1 = tb2.groupby("BranchId","Vehicle","PersonalAutoCov","PersonalVehicleCov","EffectiveDate","ExpirationDate")\
.pivot("RatingAttributeName")\
.agg({"InputValue":"max", "OutputValue":"sum"})
pvttbl1.createOrReplaceTempView("paRatingAttributes")
hc.sql("drop table if exists db.createcount")
hc.sql("Create table db.createcount STORED AS ORC as select * from paRatingAttributes")
ただし、上記のスクリプトには中間テーブルの作成が含まれており、追加の手順です。アプローチ2では、同じspark-submitコマンドでlimitキーワードを保持すると正しく機能します。
私のアプローチ1の何が問題なのですか?どうすればそれを機能させることができますか?
注: Spark java.lang.OutOfMemoryError: Java heap spaceに従い、提案されたすべての conf パラメータを試してみましたが、まだ運がありません。