4

scala.collection.immutable.MapSpark UDF 内から にアクセスするのが困難です。

マップ配信中です

val browserLangMap = sc.broadcast (Source.fromFile(browserLangFilePath).getLines.map(_.split(,)).map(e => (e(0).toInt,e(1))).toMap)

マップにアクセスする UDF の作成

def addBrowserCode = udf((browserLang:Int) => if(browserLangMap.value.contains(browserLang)) browserLangMap.value(browserLang) else "")`

UDF を使用して新しい列を追加する

val joinedDF = rawDF.join(broadcast(geoDF).as("GEO"), $"start_ip" === $"GEO.start_ip_num", "left_outer")
                        .withColumn("browser_code", addBrowserCode($"browser_language"))
                        .selectExpr(getSelectQuery:_*)

完全なスタック トレース --> https://www.dropbox.com/s/p1d5322fo9cxro6/stack_trace.txt?dl=0

org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
        at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$MetaDataSchema$
Serialization stack:
        - object not serializable (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$MetaDataSchema$, value: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$MetaDataSchema$@30b4ba52)
        - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: MetaDataSchema$module, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$MetaDataSchema$)
        - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(browser_language#235))
        - field (class: org.apache.spark.sql.catalyst.expressions.If, name: falseValue, type: class org.apache.spark.sql.catalyst.expressions.Expression)
        - object (class org.apache.spark.sql.catalyst.expressions.If, if (isnull(browser_language#235)) null else UDF(browser_language#235))
        - field (class: org.apache.spark.sql.catalyst.expressions.Alias, name: child, type: class org.apache.spark.sql.catalyst.expressions.Expression)
        - object (class org.apache.spark.sql.catalyst.expressions.Alias, if (isnull(browser_language#235)) null else UDF(browser_language#235) AS browser_language#507)
        - object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@5ae38c4e)
        - writeObject data (class: scala.collection.immutable.$colon$colon)
        - object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@5ae38c4e))
        - field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq)
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
        ... 80 more

これを引き起こしているのはブロードキャストマップへのアクセスであることを私は知っています。UDF でそれへの参照を削除すると、例外はありません。

def addBrowserCode = udf((browserLang:Int) => browserLang.toString())  //Test UDF without accessing broadcast Map and it works

スパークバージョン 1.6

4

2 に答える 2

4

これは、spark シェルの ":paste" を使用した場合の奇妙な動作であることがわかりました。これは、コード全体を :paste を使用して単一の複数行の貼り付けに貼り付けた場合にのみ発生します。

最初にブロードキャストと UDF の作成を貼り付けてから、別の :paste に join+saveToFile を貼り付けると、同じコードが完全に機能します。

scala シェルの問題である可能性があります。知らない。

于 2016-02-09T01:27:48.607 に答える