0

申し訳ありませんが、もう一度質問する必要があります。これが重複していないことを願っています。私は最後のものを編集しましたが、編集されたバージョンを見た人は誰もいなかったと思います。これは問題の短い例です:

val spark = SparkSession
.builder()
.appName("test")
.getOrCreate()

val field = StructField("1", BooleanType, false)
val schema = StructType(field::Nil)
val rowRDD = spark.sparkContext.parallelize(Array(Row(true),Row(false)))
val df = spark.createDataFrame(rowRDD, schema)

val new_df = //Add hundred of new columns

//here is the error
val df_2 = new_df.flatMap(row => if(test(row)) row::Nil else Nil)

エラー:

error: Unable to find encoder for type stored in a Dataset.  
Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  
Support for serializing other types will be added in future releases.

私がやりたいことは、各行を変更することです。この場合、列が1つしかないことがわかり、データフレーム行を更新された行にマップしようとしているときにエンコーダーエラーのように処理できます。しかし、何百もの列がある場合、どうすれば問題を解決できますか? 条件を満たさない行をいくつか削除したい。現時点で私は使用しています:

val df_2 = new_df.rdd.flatMap(row => if(test(row)) row::Nil else Nil)

しかし、これが最善の解決策だとは思いません。また、 StackoverflowErrorで実行します:

Exception in thread "main" java.lang.StackOverflowError
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)

助けてくれたTY:)

4

1 に答える 1

0

新しい列を追加する withColumn() オプションは、データセット全体で機能します。さらに列が増えると、事態はさらに悪化します。Spark SQL を使用して、SQL スタイルのクエリを作成し、新しい列を追加できます。これには、spark だけでなく、より多くの SQL スキルが必要になります。しかも100本だとメンテナンスが大変かも。

別のアプローチに従うことができます。

rdd をデータフレームに変換できます。次に、データ フレームでマップを使用し、必要に応じて各行を処理します。インサイドマップメソッド、

を。計算に基づいて新しい値を収集できます

b. 以下のように、これらの新しい列の値をメインの rdd に追加します。

val newColumns: Seq[Any] = Seq(newcol1,newcol2)
Row.fromSeq(row.toSeq.init ++ newColumns)

ここで行は、マップメソッドの行の参照です

c. 以下のように新しいスキーマを作成します

val newColumnsStructType = StructType{Seq(new StructField("newcolName1",IntegerType),new StructField("newColName2", IntegerType))

d. 古いスキーマに追加

val newSchema = StructType(mainDataFrame.schema.init ++ newColumnsStructType)

e. 新しい列で新しいデータフレームを作成する

val newDataFrame = sqlContext.createDataFrame(newRDD, newSchema)
于 2016-12-05T16:08:51.983 に答える