Spark データセットは、Row からEncoder
Pojo/プリミティブの に移動します。Catalyst
エンジンは を使用して、SQL 式のExpressionEncoder
列を変換します。Encoder
ただし、独自の実装のテンプレートとして使用できる他のサブクラスはないようです。
これは、Spark 1.X / DataFrames で問題なく、新しい体制でコンパイルされないコードの例です。
//mapping each row to RDD tuple
df.map(row => {
var id: String = if (!has_id) "" else row.getAs[String]("id")
var label: String = row.getAs[String]("label")
val channels : Int = if (!has_channels) 0 else row.getAs[Int]("channels")
val height : Int = if (!has_height) 0 else row.getAs[Int]("height")
val width : Int = if (!has_width) 0 else row.getAs[Int]("width")
val data : Array[Byte] = row.getAs[Any]("data") match {
case str: String => str.getBytes
case arr: Array[Byte@unchecked] => arr
case _ => {
log.error("Unsupport value type")
null
}
}
(id, label, channels, height, width, data)
}).persist(StorageLevel.DISK_ONLY)
}
のコンパイラエラーが発生します
Error:(56, 11) Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are supported
by importing spark.implicits._ Support for serializing other types will be added in future releases.
df.map(row => {
^
したがって、どういうわけか/どこかに手段があるはずです
- カスタム Encoder を定義/実装する
- でマッピングを実行するときにそれを適用します
DataFrame
(現在はタイプのデータセットですRow
) - 他のカスタム コードで使用するエンコーダーを登録します。
これらの手順を正常に実行するコードを探しています。