11

Spark ジョブで json を解析しようとしたときに問題が発生しました。spark 1.1.0json4s、およびを使用していCassandra Spark Connectorます。スローされる例外は次のとおりです。

java.io.NotSerializableException: org.json4s.DefaultFormats

DefaultFormats コンパニオン オブジェクトを調べると、このスタックの質問により、DefaultFormats をシリアル化できないことが明らかです。問題は今何をすべきかです。

このチケットは、spark コード ベースでキーワード transient を追加することで、この問題に対処しているように見えますが、それを私のケースに適用する方法や場所が正確にはわかりません。シリアライゼーションをすべて回避するために、エグゼキューターで DefaultFormats クラスのみをインスタンス化するソリューションはありますか? 人々が使用している scala/spark 用の別の JSON 解析ライブラリはありますか? 最初は jackson を単独で使用しようとしましたが、簡単に解決できない注釈でいくつかのエラーが発生し、json4s はすぐに使用できました。これが私のコードです:

import org.json4s._
import org.json4s.jackson.JsonMethods._
implicit val formats = DefaultFormats

val count = rdd.map(r => checkUa(r._2, r._1)).reduce((x, y) => x + y) 

checkUa 関数で json の解析を行います。何らかの形で実行を遅らせることを期待して、カウントを遅延させてみましたが、効果はありませんでした。おそらく、checkUA 内で暗黙の val を移動しますか? アドバイスをいただければ幸いです。

4

2 に答える 2

19

これは、 json4s を使用したオープン チケットで既に回答されています。implicit回避策は、宣言を関数内に置くことです

val count = rdd
               .map(r => {implicit val formats = DefaultFormats; checkUa(r._2, r._1)})
               .reduce((x, y) => x + y) 
于 2015-04-15T23:05:01.757 に答える
3

implicit val formats = ...クラス(オブジェクト)で宣言するのではなく、解析を含むメソッド内に宣言を入れると、同じエラーが発生しました。

したがって、これはエラーをスローします。

object Application {

  //... Lots of other code here, which eventually calls 
  // setupStream(...)

  def setupStream(streamingContext: StreamingContext,
                          brokers: String,
                          topologyTopicName: String) = {
    implicit val formats = DefaultFormats
    _createDStream(streamingContext, brokers, topologyTopicName)
      // Remove the message key, which is always null in our case
      .map(_._2)
      .map((json: String) => parse(json).camelizeKeys
        .extract[Record[TopologyMetadata, Unused]])
      .print()
}

しかし、これは問題ありません:

object Application {

  implicit val formats = DefaultFormats

  //... Lots of other code here, which eventually calls 
  // setupStream(...)

  def setupStream(streamingContext: StreamingContext,
                          brokers: String,
                          topologyTopicName: String) = {
    _createDStream(streamingContext, brokers, topologyTopicName)
      // Remove the message key, which is always null in our case
      .map(_._2)
      .map((json: String) => parse(json).camelizeKeys
        .extract[Record[TopologyMetadata, Unused]])
      .print()
}
于 2016-04-25T12:45:57.113 に答える