39

以下で説明するように、コードで同じことをしようとしているとき

dataframe.map(row => {
  val row1 = row.getAs[String](1)
  val make = if (row1.toLowerCase == "tesla") "S" else row1
  Row(row(0),make,row(2))
})

ここから上記の参照を取得しました: Scala: How can I replace value in Dataframs using scala しかし、エンコーダーエラーが発生しています

データセットに格納されているタイプのエンコーダが見つかりません。プリミティブ型 (Int、S string など) と Product 型 (ケース クラス) は、spark.im plicits をインポートすることでサポートされます。_ 他の型をシリアル化するためのサポートは、将来のリリースで追加される予定です。

注: 私は spark 2.0 を使用しています!

4

4 に答える 4

85

ここで予期しないことは何もありません。Spark 1.x で記述され、Spark 2.0 ではサポートされなくなったコードを使用しようとしています。

  • 1.xDataFrame.mapでは((Row) ⇒ T)(ClassTag[T]) ⇒ RDD[T]
  • 2.xDataset[Row].mapでは((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]

正直なところ、1.x でもあまり意味がありませんでした。バージョンに関係なく、単純にDataFrameAPIを使用できます。

import org.apache.spark.sql.functions.{when, lower}

val df = Seq(
  (2012, "Tesla", "S"), (1997, "Ford", "E350"),
  (2015, "Chevy", "Volt")
).toDF("year", "make", "model")

df.withColumn("make", when(lower($"make") === "tesla", "S").otherwise($"make"))

本当に使用したい場合mapは、 statically typed を使用する必要がありますDataset:

import spark.implicits._

case class Record(year: Int, make: String, model: String)

df.as[Record].map {
  case tesla if tesla.make.toLowerCase == "tesla" => tesla.copy(make = "S")
  case rec => rec
}

または、少なくとも暗黙のエンコーダーを持つオブジェクトを返します。

df.map {
  case Row(year: Int, make: String, model: String) => 
    (year, if(make.toLowerCase == "tesla") "S" else make, model)
}

最後に、完全にクレイジーな理由で本当にマッピングしたいDataset[Row]場合は、必要なエンコーダーを提供する必要があります。

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

// Yup, it would be possible to reuse df.schema here
val schema = StructType(Seq(
  StructField("year", IntegerType),
  StructField("make", StringType),
  StructField("model", StringType)
))

val encoder = RowEncoder(schema)

df.map {
  case Row(year, make: String, model) if make.toLowerCase == "tesla" => 
    Row(year, "S", model)
  case row => row
} (encoder)
于 2016-09-11T06:48:51.773 に答える