一度に複数の文字列列にインデックスを付けることができるパイプラインを作成できましたが、インデックス作成とは異なり、エンコーダーは推定器ではないため、それらのエンコードに行き詰まっています。ドキュメント。
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler,
OneHotEncoder}
import org.apache.spark.ml.Pipeline
val data = sqlContext.read.parquet("s3n://map2-test/forecaster/intermediate_data")
val df = data.select("win","bid_price","domain","size", "form_factor").na.drop()
//indexing columns
val stringColumns = Array("domain","size", "form_factor")
val index_transformers: Array[org.apache.spark.ml.PipelineStage] = stringColumns.map(
cname => new StringIndexer()
.setInputCol(cname)
.setOutputCol(s"${cname}_index")
)
// Add the rest of your pipeline like VectorAssembler and algorithm
val index_pipeline = new Pipeline().setStages(index_transformers)
val index_model = index_pipeline.fit(df)
val df_indexed = index_model.transform(df)
//encoding columns
val indexColumns = df_indexed.columns.filter(x => x contains "index")
val one_hot_encoders: Array[org.apache.spark.ml.PipelineStage] = indexColumns.map(
cname => new OneHotEncoder()
.setInputCol(cname)
.setOutputCol(s"${cname}_vec")
)
val one_hot_pipeline = new Pipeline().setStages(one_hot_encoders)
val df_encoded = one_hot_pipeline.transform(df_indexed)
OneHotEncoder オブジェクトには fit メソッドがないため、インデクサーと同じパイプラインに配置しても機能しません。パイプラインで fit を呼び出すと、エラーがスローされます。また、パイプライン ステージの配列で作成したパイプラインで変換を呼び出すこともできませんone_hot_encoders
。
エンコードするすべての列に対して、その変換自体を個別に作成して変換を呼び出すことなく、OneHotEncoder を使用するための適切な解決策が見つかりませんでした