7

次のスニペットに示すように、RDD の各パーティションがそのオブジェクトにアクセスできるように、ドライバー ノードから RDD が存在する他のノードにオブジェクトを渡したいと考えています。

object HelloSpark {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
                .setAppName("Testing HelloSpark")
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .set("spark.kryo.registrator", "xt.HelloKryoRegistrator")

        val sc = new SparkContext(conf)
        val rdd = sc.parallelize(1 to 20, 4)
        val bytes = new ImmutableBytesWritable(Bytes.toBytes("This is a test"))

        rdd.map(x => x.toString + "-" + Bytes.toString(bytes.get) + " !")
            .collect()
            .foreach(println)

        sc.stop
    }
}

// My registrator
class HelloKryoRegistrator extends KryoRegistrator {
    override def registerClasses(kryo: Kryo) = {
        kryo.register(classOf[ImmutableBytesWritable], new HelloSerializer())
    }
}

//My serializer 
class HelloSerializer extends Serializer[ImmutableBytesWritable] {
    override def write(kryo: Kryo, output: Output, obj: ImmutableBytesWritable): Unit = {
        output.writeInt(obj.getLength)
        output.writeInt(obj.getOffset)
        output.writeBytes(obj.get(), obj.getOffset, obj.getLength)
    }

    override def read(kryo: Kryo, input: Input, t: Class[ImmutableBytesWritable]): ImmutableBytesWritable = {
        val length = input.readInt()
        val offset = input.readInt()
        val bytes  = new Array[Byte](length)
        input.read(bytes, offset, length)

        new ImmutableBytesWritable(bytes)
    }
}

上記のスニペットでは、Spark で Kryo によってImmutableBytesWritableをシリアル化しようとしたため、次のようにしました。

  1. spark コンテキストに渡されるSparkConfインスタンスを構成します。つまり、「 spark.serializer」を「org.apache.spark.serializer.KryoSerializer」に設定し、「spark.kryo.registrator」を「xt.HelloKryoRegistrator 」に設定します。
  2. ImmutableBytesWritableクラスを登録するカスタム Kryo 登録クラスを作成します。
  3. ImmutableBytesWritableのシリアライザーを作成する

ただし、Spark アプリケーションを yarn-client モードで送信すると、次の例外がスローされました。

スレッド "メイン" org.apache.spark.SparkException での例外: タスクは org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) でシリアライズできません org.apache.spark.util.ClosureCleaner$.clean で(ClosureCleaner.scala:158) org.apache.spark.SparkContext.clean(SparkContext.scala:1242) で org.apache.spark.rdd.RDD.map(RDD.scala:27​​0) で xt.HelloSpark$.main で(HelloSpark.scala:23) xt.HelloSpark.main(HelloSpark.scala) で sun.reflect.NativeMethodAccessorImpl.invoke0(ネイティブ メソッド) で sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) で sun.reflect. org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:325) org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) で org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) で 原因: java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData( ObjectOutputStream.java:1508) で java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) で java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) で java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) で) org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) で org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 12 詳細

ImmutableBytesWritableは Kryo でシリアル化できないようです。では、Kryo を使用して Spark にオブジェクトをシリアル化させる正しい方法は何でしょうか? Kryo は任意の型をシリアル化できますか?

4

1 に答える 1