9

基本的に、Spark を使用して HDFS で複雑な JSON を分析する必要があります。

「for Comprehensions」を使用してJSONを(事前に)フィルタリングし、json4sの「extract」メソッドを使用してケースクラスにラップします

これはうまくいきます!

def foo(rdd: RDD[String]) = {

case class View(C: String,b: Option[Array[List[String]]],  t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats

rdd.map { jsonString =>
  val jsonObj = parse(jsonString)
  val listsOfView = for {
    JObject(value) <- jsonObj
    JField(("v"), JObject(views)) <- value
    normalized <- views.map(x => (x._2))
  } yield normalized
}

ここまでは順調ですね!

(事前に) フィルター処理された JSON を CaseClass に抽出しようとすると、次のようになります。

スレッド "メイン" org.apache.spark.SparkException での例外: ステージの失敗によりジョブが中止されました: タスクはシリアル化できません: java.io.NotSerializableException: org.json4s.DefaultFormats$

ここで抽出したコード:

def foo(rdd: RDD[String]) = {

case class View(C: String,b: Option[Array[List[String]]],  t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats

rdd.map { jsonString =>
  val jsonObj = parse(jsonString)
  val listsOfView = for {
    JObject(value) <- jsonObj
    JField(("v"), JObject(views)) <- value
    normalized <- views.map(x => (x._2))
  } yield normalized.extract[View]
}

私はすでに scala ws で自分のコードを試しました。hdfsとsparkのことは本当に初めてなので、ヒントをいただければ幸いです。

4

3 に答える 3