4

次のスニペットではtryParquet、Parquet ファイルが存在する場合、関数はデータセットを読み込もうとします。そうでない場合は、提供されたデータセット プランを計算して保持し、返します。

import scala.util.{Try, Success, Failure}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset

sealed trait CustomRow

case class MyRow(
  id: Int,
  name: String
) extends CustomRow

val ds: Dataset[MyRow] =
  Seq((1, "foo"),
      (2, "bar"),
      (3, "baz")).toDF("id", "name").as[MyRow]

def tryParquet[T <: CustomRow](session: SparkSession, path: String, target: Dataset[T]): Dataset[T] =
    Try(session.read.parquet(path)) match {
      case Success(df) => df.as[T] // <---- compile error here
      case Failure(_)  => {
        target.write.parquet(path)
        target
      }
    }

val readyDS: Dataset[MyRow] =
    tryParquet(spark, "/path/to/file.parq", ds)

ただし、これにより次のコンパイルエラーが発生しますdf.as[T]

データセットに格納されているタイプのエンコーダが見つかりません。プリミティブ型 (Int、String など) と製品型 (ケース クラス) は、spark.implicits._ をインポートすることでサポートされます。

他のタイプのシリアル化のサポートは、将来のリリースで追加される予定です。

case Success(df) => df.as[T]

型なしを返すようにtryParquetキャストを行い、呼び出し元が目的のコンストラクターにキャストできるようにすることで、この問題を回避できます。ただし、型を関数によって内部的に管理したい場合の解決策はありますか?dfDataFrame

4

1 に答える 1