12

私は Apache Spark 2.0 を使用しておりcase class、言及用のスキーマを作成していますDetaSetHow to store custom objects in Dataset?に従ってカスタム エンコーダーを定義しようとすると、、java.time.LocalDate次の例外が発生したため:

java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDate
- field (class: "java.time.LocalDate", name: "callDate")
- root class: "FireService"
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:598)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:592)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:583)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
............

以下はコードによるものです:

case class FireService(callNumber: String, callDate: java.time.LocalDate)
implicit val localDateEncoder: org.apache.spark.sql.Encoder[java.time.LocalDate] = org.apache.spark.sql.Encoders.kryo[java.time.LocalDate]

val fireServiceDf = df.map(row => {
val dateFormatter = java.time.format.DateTimeFormatter.ofPattern("MM/dd /yyyy")
FireService(row.getAs[String](0),  java.time.LocalDate.parse(row.getAs[String](4), dateFormatter))
})

Spark 用のサードパーティ API のエンコーダーを定義するにはどうすればよいですか?

アップデート

ケース クラス全体のエンコーダを作成するときはdf.map..、次のようにオブジェクトをバイナリにマップします。

implicit val fireServiceEncoder: org.apache.spark.sql.Encoder[FireService] = org.apache.spark.sql.Encoders.kryo[FireService]

val fireServiceDf = df.map(row => {
 val dateFormatter = java.time.format.DateTimeFormatter.ofPattern("MM/dd/yyyy")
 FireService(row.getAs[String](0), java.time.LocalDate.parse(row.getAs[String](4), dateFormatter))
})

fireServiceDf: org.apache.spark.sql.Dataset[FireService] = [value: binary]

FireService のマップを期待していますが、マップのバイナリを返します。

4

1 に答える 1

5

最後のコメントにあるように、「クラスにフィールド Bar が含まれている場合は、オブジェクト全体のエンコーダが必要です。」FireServiceそれ自体に暗黙的な Encoder を提供する必要があります。それ以外の場合、Spark は を使用して作成しますSQLImplicits.newProductEncoder[T <: Product : TypeTag]: Encoder[T]implicitタイプから、フィールドに Encoder パラメーターを使用していないことがわかります。そのため、の存在を使用できませんlocalDateEncoder

Spark はこれを処理するように変更できます。たとえば、Shapeless ライブラリを使用したり、マクロを直接使用したりできます。これが将来の計画であるかどうかはわかりません。

于 2016-08-03T10:02:52.550 に答える