基本的に、Spark を使用して HDFS で複雑な JSON を分析する必要があります。
「for Comprehensions」を使用してJSONを(事前に)フィルタリングし、json4sの「extract」メソッドを使用してケースクラスにラップします
これはうまくいきます!
def foo(rdd: RDD[String]) = {
case class View(C: String,b: Option[Array[List[String]]], t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats
rdd.map { jsonString =>
val jsonObj = parse(jsonString)
val listsOfView = for {
JObject(value) <- jsonObj
JField(("v"), JObject(views)) <- value
normalized <- views.map(x => (x._2))
} yield normalized
}
ここまでは順調ですね!
(事前に) フィルター処理された JSON を CaseClass に抽出しようとすると、次のようになります。
スレッド "メイン" org.apache.spark.SparkException での例外: ステージの失敗によりジョブが中止されました: タスクはシリアル化できません: java.io.NotSerializableException: org.json4s.DefaultFormats$
ここで抽出したコード:
def foo(rdd: RDD[String]) = {
case class View(C: String,b: Option[Array[List[String]]], t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats
rdd.map { jsonString =>
val jsonObj = parse(jsonString)
val listsOfView = for {
JObject(value) <- jsonObj
JField(("v"), JObject(views)) <- value
normalized <- views.map(x => (x._2))
} yield normalized.extract[View]
}
私はすでに scala ws で自分のコードを試しました。hdfsとsparkのことは本当に初めてなので、ヒントをいただければ幸いです。