21

Spark データセットは、Row からEncoderPojo/プリミティブの に移動します。Catalystエンジンは を使用して、SQL 式のExpressionEncoder列を変換します。Encoderただし、独自の実装のテンプレートとして使用できる他のサブクラスはないようです。

これは、Spark 1.X / DataFrames で問題なく、新しい体制でコンパイルされないコードの例です。

//mapping each row to RDD tuple
df.map(row => {
    var id: String = if (!has_id) "" else row.getAs[String]("id")
    var label: String = row.getAs[String]("label")
    val channels  : Int = if (!has_channels) 0 else row.getAs[Int]("channels")
    val height  : Int = if (!has_height) 0 else row.getAs[Int]("height")
    val width : Int = if (!has_width) 0 else row.getAs[Int]("width")
    val data : Array[Byte] = row.getAs[Any]("data") match {
      case str: String => str.getBytes
      case arr: Array[Byte@unchecked] => arr
      case _ => {
        log.error("Unsupport value type")
        null
      }
    }
    (id, label, channels, height, width, data)
  }).persist(StorageLevel.DISK_ONLY)

}

のコンパイラエラーが発生します

Error:(56, 11) Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are supported 
by importing spark.implicits._  Support for serializing other types will be added in future releases.
    df.map(row => {
          ^

したがって、どういうわけか/どこかに手段があるはずです

  • カスタム Encoder を定義/実装する
  • でマッピングを実行するときにそれを適用しますDataFrame(現在はタイプのデータセットですRow)
  • 他のカスタム コードで使用するエンコーダーを登録します。

これらの手順を正常に実行するコードを探しています。

4

3 に答える 3

2

暗黙のエンコーダーをインポートしましたか?

import spark.implicits._

http://spark.apache.org/docs/2.0.0-preview/api/scala/index.html#org.apache.spark.sql.Encoder

于 2016-06-11T23:56:22.987 に答える