4

私はspark 2.0.1を使用しており、nan値を列の最後の適切な既知の値で埋めたいと思っています。

私が見つけることができたスパークの唯一のリファレンスは、 Spark / Scala: 最後の観測によるフォワードフィルまたはRDDを使用していると思われるpysparkで以前に既知の適切な値でnullを埋めることです。

データフレーム/データセットの世界にとどまり、複数の nan 値を処理できるようにしたいと考えています。これは可能ですか?

私の仮定では、データ (たとえば、CSV ファイルから最初にロードされたもの) は時間順に並べられ、この順序は分散設定で保持されます。たとえば、近い値/最後の適切な既知の値で埋めることは正しいです。おそらく、ほとんどの場合、以前の値で埋めることで十分です。連続する 2 つ以上の nan レコードはありません. これは実際に成り立ちますか? ポイントは、

myDf.sort("foo").show

あらゆる順序を破壊します。たとえば、すべてのnull値が最初に来ます。

小さな例:

import java.sql.{ Date, Timestamp }
case class FooBar(foo:Date, bar:String)
val myDf = Seq(("2016-01-01","first"),("2016-01-02","second"),("2016-wrongFormat","noValidFormat"), ("2016-01-04","lastAssumingSameDate"))
         .toDF("foo","bar")
         .withColumn("foo", 'foo.cast("Date"))
         .as[FooBar]

結果は

+----------+--------------------+
|       foo|                 bar|
+----------+--------------------+
|2016-01-01|               first|
|2016-01-02|              second|
|      null|       noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+

最後の適切な既知の値で値を修正したいと思います。どうすればこれを達成できますか?

+----------+--------------------+
|       foo|                 bar|
+----------+--------------------+
|2016-01-01|               first|
|2016-01-02|              second|
|2016-01-02|       noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+

編集

私の場合、非常に限られた誤った値しかないため、上の行の値を入力するだけで十分です。

編集2

インデックス列を追加しようとしています

val myDf = Seq(("2016-01-01", "first"), ("2016-01-02", "second"), ("2016-wrongFormat", "noValidFormat"), ("2016-01-04", "lastAssumingSameDate"))
    .toDF("foo", "bar")
    .withColumn("foo", 'foo.cast("Date"))
    .as[FooBar]
    .withColumn("rowId", monotonically_increasing_id())

そして、最後の値を入力します。

myDf.withColumn("fooLag", lag('foo, 1) over Window.orderBy('rowId)).show

ただし、次の警告が表示 されます。ウィンドウ操作にパーティションが定義されていません! すべてのデータを 1 つのパーティションに移動すると、パフォーマンスが大幅に低下する可能性があります。どうすれば意味のあるパーティションを導入できますか?

+----------+--------------------+-----+----------+
|       foo|                 bar|rowId|    fooLag|
+----------+--------------------+-----+----------+
|2016-01-01|               first|    0|      null|
|2016-01-02|              second|    1|2016-01-01|
|      null|       noValidFormat|    2|2016-01-02|
|2016-01-04|lastAssumingSameDate|    3|      null|
+----------+--------------------+-----+----------+
4

2 に答える 2

1

これは中間の答えです。ただし、パーティションがない/単一のパーティションのみが使用されるため、それほど優れていません。私はまだ問題を解決するためのより良い方法を探しています

df
    .withColumn("rowId", monotonically_increasing_id())
    .withColumn("replacement", lag('columnWithNull, 1) over Window.orderBy('rowId))
    .withColumn("columnWithNullReplaced",
      when($"columnWithNull" isNull, "replacement").otherwise($"columnWithNull")

    )

編集

mapPartitionsWithIndex https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2を使用してより良いソリューションの構築に取り組んでいますが、まだ完成していません。

編集2

追加

if (i == 0) {
          lastNotNullRow = toCarryBd.value.get(i + 1).get
        } else {
          lastNotNullRow = toCarryBd.value.get(i - 1).get
        }

望ましい結果につながります。

于 2016-11-17T20:04:59.957 に答える