4

scala 関数定義を含む文字列から spark(2.0) で udf を定義しようとしています。スニペットは次のとおりです。

val universe: scala.reflect.runtime.universe.type = scala.reflect.runtime.universe
import universe._
import scala.reflect.runtime.currentMirror
import scala.tools.reflect.ToolBox
val toolbox = currentMirror.mkToolBox()
val f = udf(toolbox.eval(toolbox.parse("(s:String) => 5")).asInstanceOf[String => Int])
sc.parallelize(Seq("1","5")).toDF.select(f(col("value"))).show

これは私にエラーを与えます:

  Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
   at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
   at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
   at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
   at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
   at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
   at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
   at org.apache.spark.scheduler.Task.run(Task.scala:85)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   at java.lang.Thread.run(Thread.java:745)

ただし、udf を次のように定義すると:

val f = udf((s:String) => 5)

それはうまく動作します。ここでの問題は何ですか?最終的な目的は、scala 関数の定義を持つ文字列を取得し、それを udf として使用することです。

4

2 に答える 2

5

Giovanny が観察したように、問題はクラス ローダーが異なることにあります (.getClass.getClassLoaderオブジェクトを呼び出すことで、これをさらに調査できます)。次に、ワーカーがリフレクションされた関数を逆シリアル化しようとすると、すべてが崩壊します。

これは、クラスローダーのハッキングを含まないソリューションです。アイデアは、反射ステップをワーカーに移動することです。リフレクション ステップをやり直す必要がありますが、ワーカーごとに 1回だけです。これは非常に最適だと思います。マスター ノードでリフレクションを 1 回だけ行ったとしても、ワーカーに関数を認識させるには、ワーカーごとにかなりの作業を行う必要があります。

val f = udf {
  new Function1[String,Int] with Serializable {
    import scala.reflect.runtime.universe._
    import scala.reflect.runtime.currentMirror
    import scala.tools.reflect.ToolBox

    lazy val toolbox = currentMirror.mkToolBox()
    lazy val func = {
      println("reflected function") // triggered at every worker
      toolbox.eval(toolbox.parse("(s:String) => 5")).asInstanceOf[String => Int]
    }

    def apply(s: String): Int = func(s)
  }
}

その後、呼び出しsc.parallelize(Seq("1","5")).toDF.select(f(col("value"))).showは問題なく機能します。

自由にコメントアウトしてprintlnください。これは、リフレクションが発生した回数を数えるための簡単な方法です。それspark-shell --master 'local'は1回だけですが、spark-shell --master 'local[2]'2回です。

使い方

UDF はすぐに評価されますが、ワーカー ノードに到達するまでは使用されないため、遅延値toolboxfuncはワーカーでのみ評価されます。さらに、それらは怠け者であるため、ワーカーごとに 1 回しか評価されません。

于 2016-08-14T20:14:52.367 に答える
3

同じエラーが発生しましたが、JavaDeserializationStream クラスが例外をキャッチしているため、ClassNotFoundException は表示されません。RDD/DataSet から実行しようとしているクラスが見つからないため、環境によっては失敗していますが、 ClassNotFoundError は表示されません。この問題を修正するには、プロジェクトのすべてのクラス (関数と依存関係を含む) を含む jar を生成し、spark 環境内にその jar を含める必要がありました。

スタンドアロン クラスタの場合

conf.setJars ( Array ("/fullpath/yourgeneratedjar.jar", "/fullpath/otherdependencies.jar") )

これはヤーンクラスター用です

conf.set("spark.yarn.jars", "/fullpath/yourgeneratedjar.jar,/fullpath/otherdependencies.jar")
于 2016-08-11T23:42:22.510 に答える