17

値が欠落している Spark データフレームがあります。欠損値をその列の平均に置き換えることで、単純な代入を実行したいと思います。私は Spark を初めて使用するので、このロジックの実装に苦労しています。これは私がこれまでにやったことです:

a)単一の列(列Aとしましょう)に対してこれを行うには、次のコード行が機能するようです:

df.withColumn("new_Col", when($"ColA".isNull, df.select(mean("ColA"))
  .first()(0).asInstanceOf[Double])
  .otherwise($"ColA"))

b)ただし、データフレーム内のすべての列に対してこれを行う方法を理解できませんでした。Map 関数を試していましたが、データフレームの各行をループしていると思います

c) SO についても同様の質問があります - here。そして、私はソリューション(集約テーブルと合体を使用)が好きでしたが、各列をループすることでこれを行う方法があるかどうかを非常に知りたいと思っていました(私はRから来たので、次のような高次機能を使用して各列をループしますラップリーは私にはより自然に思えます)。

ありがとう!

4

3 に答える 3

27

火花 >= 2.2

使用できますorg.apache.spark.ml.feature.Imputer(平均戦略と中央戦略の両方をサポートしています)。

スカラ

import org.apache.spark.ml.feature.Imputer

val imputer = new Imputer()
  .setInputCols(df.columns)
  .setOutputCols(df.columns.map(c => s"${c}_imputed"))
  .setStrategy("mean")

imputer.fit(df).transform(df)

パイソン:

from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=df.columns, 
    outputCols=["{}_imputed".format(c) for c in df.columns]
)
imputer.fit(df).transform(df)

火花 < 2.2

はい、どうぞ:

import org.apache.spark.sql.functions.mean

df.na.fill(df.columns.zip(
  df.select(df.columns.map(mean(_)): _*).first.toSeq
).toMap)

どこ

df.columns.map(mean(_)): Array[Column] 

各列の平均を計算し、

df.select(_: *).first.toSeq: Seq[Any]

集計された値を収集し、行を次のように変換しますSeq[Any](最適ではないことはわかっていますが、これは使用する必要がある API です)。

df.columns.zip(_).toMap: Map[String,Any] 

aMap: Map[String, Any]列名からその平均にマップするものを作成し、最後に:

df.na.fill(_): DataFrame

以下を使用して欠損値を埋めます。

fill: Map[String, Any] => DataFrame 

からDataFrameNaFunctions

NaNエントリを無視するには、次のように置き換えます。

df.select(df.columns.map(mean(_)): _*).first.toSeq

と:

import org.apache.spark.sql.functions.{col, isnan, when}


df.select(df.columns.map(
  c => mean(when(!isnan(col(c)), col(c)))
): _*).first.toSeq
于 2016-10-15T13:14:43.163 に答える