1

Spark Jobs の新人で、次の問題があります。

新しく結合されたデータフレームのいずれかでカウントを実行すると、ジョブが長時間実行され、メモリがディスクに流出します。ここに論理エラーはありますか?

    // pass spark configuration
    val conf = new SparkConf()
      .setMaster(threadMaster)
      .setAppName(appName)

    // Create a new spark context
    val sc = new SparkContext(conf)

    // Specify a SQL context and pass in the spark context we created
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)


    // Create three dataframes for sent and clicked files. Mark them as raw, since they will be renamed
    val dfSentRaw = sqlContext.read.parquet(inputPathSent)
    val dfClickedRaw = sqlContext.read.parquet(inputPathClicked)
    val dfFailedRaw  = sqlContext.read.parquet(inputPathFailed)



    // Rename the columns to avoid ambiguity when accessing the fields later
    val dfSent = dfSentRaw.withColumnRenamed("customer_id", "sent__customer_id")
      .withColumnRenamed("campaign_id", "sent__campaign_id")
      .withColumnRenamed("ced_email", "sent__ced_email")
      .withColumnRenamed("event_captured_dt", "sent__event_captured_dt")
      .withColumnRenamed("riid", "sent__riid")


    val dfClicked = dfClickedRaw.withColumnRenamed("customer_id", "clicked__customer_id")
      .withColumnRenamed("event_captured_dt", "clicked__event_captured_dt")
    val dfFailed = dfFailedRaw.withColumnRenamed("customer_id", "failed__customer_id")


    // LEFT Join with CLICKED on two fields, customer_id and campaign_id
    val dfSentClicked = dfSent.join(dfClicked, dfSent("sent__customer_id") === dfClicked("clicked__customer_id")
      && dfSent("sent__campaign_id") === dfClicked("campaign_id"), "left")
     dfSentClicked.count() //THIS WILL NOT WORK

val dfJoined = dfSentClicked.join(dfFailed, dfSentClicked("sent__customer_id") === dfFailed("failed__customer_id")
      && dfSentClicked("sent__campaign_id") === dfFailed("campaign_id"), "left")

これらの 2 つまたは 3 つのデータフレームをもうカウントできないのはなぜですか? 名前を変更してインデックス作成を台無しにしましたか?

ありがとうございました!

ここに画像の説明を入力

4

1 に答える 1