1

編集:以前の質問の質について申し訳ありませんが、これがより明確になることを願っています:Sparkアプリケーションでは、次のJSONファイルのディレクトリ全体をロードしています:

    {
        "type": "some_type",
        "payload": {
            "data1": {
                "id": "1"           
            },
            "data2": {
                "id": "1",

            },
            "data3": {
                "id": "1"
            },
            "dataset1": [{
                "data11": {
                    "id": "1",
                },
                "data12": {
                    "id": "1",
                }
            }],
            "masterdata": {
                "md1": [{
                    "id": "1"
                },
                {
                    "id": "2"
                },
                {
                    "id": "3"
                }],
                "md2": [{
                    "id": "1",
                },
                {
                    "id": "2",
                },
                {
                    "id": "3",
                }]
            }
        }
    }

後で使用するDataFrameために一時テーブルとして保存します。この Json では、「payload」ノードのフィールドは常に存在しますが、「masterdata」のサブノードはオプションです。次のステップは、次のように Json のサブノードごとに複数の DataFrame を作成することです。最初の部分を処理した後、Spark の状態は次のとおりです。

ここで問題が発生します。ディレクトリ内の JSON ファイルの 1 つに md2 ノードが含まれていない場合、NullPointException が原因で "md2" DataFrameshow()でも実行できません。collect()すべてのファイルに「md2」ノードがない場合は理解できるので、md2 DataFrame を作成できませんでしたが、この場合、md2 DataFrame には、ノード md2 を持たず、他のすべてを含む json ファイルからのデータがないことが予想されます。

技術的な詳細: ネストされたノードからデータを読み取るには、rdd.map と rdd.flatmap を使用し、それをDataFrameカスタム列名に変換します

ディレクトリ内のすべてのファイルにすべてのノードが含まれているときにアプリケーションを実行すると、すべてが機能しますが、単一のファイルが md2 ノードにない場合、アプリケーションは .show() または .collect() で失敗します

ところで、ノードが存在するが空の場合、すべて正常に動作します。

Spark でオプションの Json ノードをサポートしたり、rdd.map&flatmap 内の欠落したノードを処理したりする方法はありますか?

前の質問よりも明確であることを願っています

@Berylium リクエストで、md2 DataFrame を取得するために使用している rdd 操作を次に示します。

    val jsonData = hiveContext.sql("SELECT `payload`.masterdata.md2 FROM jsonData")
    val data = jsonData.rdd.flatMap(row => row.getSeq[Row](0)).map(row => (
    row.getString(row.fieldIndex("id"))
    )).distinct
    val dataDF = data.toDF("id")    
4

1 に答える 1

3

クイックフィックス

filter()次のように挿入してみてください。

sqlContext.sql("SELECT payload.masterdata.md2 FROM jsonData")
  .rdd
  .filter(_.getSeq[Row](0) != null)
  .flatMap(row => row.getSeq[Row](0))
  .map(row => (row.getString(row.fieldIndex("id"))))
  .distinct
  .toDF("id")
  .show()

爆発()の使用

これにより、null 値ができるだけ早く削除されます。したがって、より高速になるはずです (少なくとも短くなります)。

sqlContext
  .sql("select t.a.id from (SELECT explode(payload.masterdata.md2) as a FROM jsonData) t")
  • explode()を爆発させnullます。
  • 次に、サブクエリは ID のみを抽出します

さらに簡単: 最初に ID を抽出してからexplode():

sqlContext.sql("SELECT explode(payload.masterdata.md2.id) FROM jsonData").show()
于 2015-11-27T11:23:58.990 に答える