5

two case classBase を拡張することを書きましたabstract class。各クラスの 2 つのリスト (listAおよびlistB) があります。これら 2 つのリストをマージしたい場合、最終的なリストを Apache Spark 1.6.1 データセットに変換できません。

abstract class Base

case class A(name: String) extends Base
case class B(age: Int) extends Base

val listA: List[A] = A("foo")::A("bar")::Nil
val listB: List[B] = B(10)::B(20)::Nil
val list: List[Base with Product with Serializable] = listA ++ listB

val result: RDD[Base with Product with Serializable] = sc.parallelize(list).toDS()

Apache Spark では、次の例外が発生します。

A needed class was not found. This could be due to an error in your runpath. Missing class: no Java class corresponding to Base with Product with Serializable found
java.lang.NoClassDefFoundError: no Java class corresponding to Base with Product with Serializable found
    at scala.reflect.runtime.JavaMirrors$JavaMirror.typeToJavaClass(JavaMirrors.scala:1299)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:192)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:54)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:50)
    at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:41)

Sparkから RDD を作成したい場合list、例外はスローされませんが、toDS()メソッドを使用して RDD をデータセットに変換すると、この前の例外がスローされます。

4

1 に答える 1

3

まず、明示的に にするか、ケースクラス/オブジェクトによってのみ拡張することを意図している場合は追加listすることで、より適切な型を取得できます。でもこれだけじゃ足りないからList[Base]Base extends Product with Serializable

Spark 1.6 には、プリミティブ型 (String、Integer、Long など)、Scala ケース クラス、Java Beans など、さまざまな型のエンコーダーの自動生成がサポートされています。

のような抽象クラスBaseはサポートされていないことに注意してください。また、カスタム エンコーダーもサポートされていません。kryo(またはjavaSerialization最後の手段として) エンコーダーを使用することもできますが、カスタム オブジェクトをデータセットに保存する方法を参照してください。.

完全な作業例を次に示します。

abstract class Base extends Serializable with Product

case class A(name: String) extends Base

case class B(age: Int) extends Base

object BaseEncoder {
  implicit def baseEncoder: org.apache.spark.Encoder[Base] = org.apache.spark.Encoders.kryo[Base]
}


val listA: Seq[A] = Seq(A("a"), A("b"))
val listB: Seq[B] = Seq(B(1), B(2))
val list: Seq[Base] = listA ++ listB

val ds = sc.parallelize(list).toDS
于 2016-05-29T15:09:08.153 に答える