Spark Jobs の新人で、次の問題があります。
新しく結合されたデータフレームのいずれかでカウントを実行すると、ジョブが長時間実行され、メモリがディスクに流出します。ここに論理エラーはありますか?
// pass spark configuration
val conf = new SparkConf()
.setMaster(threadMaster)
.setAppName(appName)
// Create a new spark context
val sc = new SparkContext(conf)
// Specify a SQL context and pass in the spark context we created
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Create three dataframes for sent and clicked files. Mark them as raw, since they will be renamed
val dfSentRaw = sqlContext.read.parquet(inputPathSent)
val dfClickedRaw = sqlContext.read.parquet(inputPathClicked)
val dfFailedRaw = sqlContext.read.parquet(inputPathFailed)
// Rename the columns to avoid ambiguity when accessing the fields later
val dfSent = dfSentRaw.withColumnRenamed("customer_id", "sent__customer_id")
.withColumnRenamed("campaign_id", "sent__campaign_id")
.withColumnRenamed("ced_email", "sent__ced_email")
.withColumnRenamed("event_captured_dt", "sent__event_captured_dt")
.withColumnRenamed("riid", "sent__riid")
val dfClicked = dfClickedRaw.withColumnRenamed("customer_id", "clicked__customer_id")
.withColumnRenamed("event_captured_dt", "clicked__event_captured_dt")
val dfFailed = dfFailedRaw.withColumnRenamed("customer_id", "failed__customer_id")
// LEFT Join with CLICKED on two fields, customer_id and campaign_id
val dfSentClicked = dfSent.join(dfClicked, dfSent("sent__customer_id") === dfClicked("clicked__customer_id")
&& dfSent("sent__campaign_id") === dfClicked("campaign_id"), "left")
dfSentClicked.count() //THIS WILL NOT WORK
val dfJoined = dfSentClicked.join(dfFailed, dfSentClicked("sent__customer_id") === dfFailed("failed__customer_id")
&& dfSentClicked("sent__campaign_id") === dfFailed("campaign_id"), "left")
これらの 2 つまたは 3 つのデータフレームをもうカウントできないのはなぜですか? 名前を変更してインデックス作成を台無しにしましたか?
ありがとうございました!