3

私が持っているデータを処理するために、以前にスキーマを抽出しているので、データセットを読み取るときに、スキーマを推測するという高価な手順を実行する代わりにスキーマを提供します。

スキーマを構築するには、いくつかの異なるスキーマを最終的なスキーマにマージする必要があるため、メソッドunion (++)distinctメソッドを使用していますが、org.apache.spark.sql.AnalysisException: Duplicate column(s)例外が発生し続けます。

たとえば、次の構造に 2 つのスキーマがあるとします。

val schema1 = StructType(StructField("A", StructType(
    StructField("i", StringType, true) :: Nil
    ), true) :: Nil)

val schema2 = StructType(StructField("A", StructType(
    StructField("i", StringType, true) :: Nil
    ), true) :: Nil)

val schema3 = StructType(StructField("A", StructType(
    StructField("i", StringType, true) ::
    StructField("ii", StringType, true) :: Nil
    ), true) :: Nil)

val final_schema = (schema1 ++ schema2 ++ schema3).distinct

println(final_schema)

出力:

StructType(
    StructField(A,StructType(
         StructField(i,StringType,true)),true), 
    StructField(A,StructType(
        StructField(i,StringType,true),    
        StructField(ii,StringType,true)),true))

別のスキーマと完全に一致するスキーマ構造のみが によって除外されることを理解していますdistinct。ただし、結果を次のようにしたい:

StructType(
    StructField(A,StructType(
        StructField(i,StringType,true),    
        StructField(ii,StringType,true)),true))

すべてが 1 つのスキーマに「結合」されます。scala ドキュメントのすべてのメソッドをふるいにかけましたが、これを解決する正しいメソッドが見つからないようです。何か案は?

編集:

最終的な目標は、メソッドを使用して JSON 文字列の RDD にフィードしfinal_schemasqlContext.read.schema読み取るreadことです。

4

2 に答える 2

0

次のようなことを試してください:

(schema1 ++ schema2 ++ schema3).groupBy(getKey).map(_._2.head)

ここgetKeyで、スキーマから、マージを検討するプロパティ (たとえば、列名またはサブ フィールドの名前) に移動する関数です。関数ではmap、頭を取るか、より複雑な関数を使用して特定のスキーマを保持できます。

于 2016-12-28T07:19:45.060 に答える