10

一度に複数の文字列列にインデックスを付けることができるパイプラインを作成できましたが、インデックス作成とは異なり、エンコーダーは推定器ではないため、それらのエンコードに行き詰まっています。ドキュメント

import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, 

OneHotEncoder}
import org.apache.spark.ml.Pipeline

val data = sqlContext.read.parquet("s3n://map2-test/forecaster/intermediate_data")

val df = data.select("win","bid_price","domain","size", "form_factor").na.drop()


//indexing columns
val stringColumns = Array("domain","size", "form_factor")
val index_transformers: Array[org.apache.spark.ml.PipelineStage] = stringColumns.map(
  cname => new StringIndexer()
    .setInputCol(cname)
    .setOutputCol(s"${cname}_index")
)

// Add the rest of your pipeline like VectorAssembler and algorithm
val index_pipeline = new Pipeline().setStages(index_transformers)
val index_model = index_pipeline.fit(df)
val df_indexed = index_model.transform(df)


//encoding columns
val indexColumns  = df_indexed.columns.filter(x => x contains "index")
val one_hot_encoders: Array[org.apache.spark.ml.PipelineStage] = indexColumns.map(
    cname => new OneHotEncoder()
     .setInputCol(cname)
     .setOutputCol(s"${cname}_vec")
)



val one_hot_pipeline = new Pipeline().setStages(one_hot_encoders)
val df_encoded = one_hot_pipeline.transform(df_indexed)

OneHotEncoder オブジェクトには fit メソッドがないため、インデクサーと同じパイプラインに配置しても機能しません。パイプラインで fit を呼び出すと、エラーがスローされます。また、パイプライン ステージの配列で作成したパイプラインで変換を呼び出すこともできませんone_hot_encoders

エンコードするすべての列に対して、その変換自体を個別に作成して変換を呼び出すことなく、OneHotEncoder を使用するための適切な解決策が見つかりませんでした

4

1 に答える 1

6

火花 >= 3.0 :

Spark 3.0OneHotEncoderEstimatorでは、次のように名前が変更されましたOneHotEncoder

import org.apache.spark.ml.feature.{OneHotEncoder, OneHotEncoderModel}

val encoder = new OneHotEncoder()
  .setInputCols(indexColumns)
  .setOutputCols(indexColumns map (name => s"${name}_vec"))

火花 >= 2.3

Spark 2.3 では、新しいクラスOneHotEncoderEstimator、が導入されました。これは、 のOneHotEncoderModel外で使用する場合でもフィッティングが必要Pipelineであり、同時に複数の列を操作します。

import org.apache.spark.ml.feature.{OneHotEncoderEstimator, OneHotEncoderModel}

val encoder = new OneHotEncoderEstimator()
  .setInputCols(indexColumns)
  .setOutputCols(indexColumns map (name => s"${name}_vec"))


encoder.fit(df_indexed).transform(df_indexed)

火花 < 2.3

使用するトランスフォーマーがフィッティングを必要としない場合でも、fitメソッドを使用して、データの変換に使用できる を作成する必要PipelineModelがあります。

one_hot_pipeline.fit(df_indexed).transform(df_indexed)

補足として、インデックス作成とエンコーディングを単一の に組み合わせることができますPipeline:

val pipeline = new Pipeline()
  .setStages(index_transformers ++ one_hot_encoders)

val model = pipeline.fit(df)
model.transform(df)

編集

表示されるエラーは、列の 1 つに空の が含まれていることを意味しますString。インデクサーによって受け入れられますが、エンコードには使用できません。要件に応じて、これらを削除するか、ダミー ラベルを使用できます。残念ながら、 SPARK-11569)が解決さNULLsれるまでは使用できません。

于 2015-12-08T22:22:08.033 に答える