スパーク >= 2.3、>= 3.0
Spark 2.3OneHotEncoder
は推奨されていないため、OneHotEncoderEstimator
. 最近のリリースを使用する場合は、encoder
コードを変更してください
from pyspark.ml.feature import OneHotEncoderEstimator
encoder = OneHotEncoderEstimator(
inputCols=["gender_numeric"],
outputCols=["gender_vector"]
)
Spark 3.0 では、このバリアントは次のように名前が変更されましたOneHotEncoder
。
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(
inputCols=["gender_numeric"],
outputCols=["gender_vector"]
)
さらにStringIndexer
、複数の入力列をサポートするように拡張されました。
StringIndexer(inputCols=["gender"], outputCols=["gender_numeric"])
スパーク < 2.3
UDF を作成することはできますが、なぜ作成するのでしょうか? このカテゴリのタスクを処理するために設計されたかなりの数のツールが既にあります。
from pyspark.sql import Row
from pyspark.ml.linalg import DenseVector
row = Row("gender", "foo", "bar")
df = sc.parallelize([
row("0", 3.0, DenseVector([0, 2.1, 1.0])),
row("1", 1.0, DenseVector([0, 1.1, 1.0])),
row("1", -1.0, DenseVector([0, 3.4, 0.0])),
row("0", -3.0, DenseVector([0, 4.1, 0.0]))
]).toDF()
まず第一にStringIndexer
。
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="gender", outputCol="gender_numeric").fit(df)
indexed_df = indexer.transform(df)
indexed_df.drop("bar").show()
## +------+----+--------------+
## |gender| foo|gender_numeric|
## +------+----+--------------+
## | 0| 3.0| 0.0|
## | 1| 1.0| 1.0|
## | 1|-1.0| 1.0|
## | 0|-3.0| 0.0|
## +------+----+--------------+
次へOneHotEncoder
:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCol="gender_numeric", outputCol="gender_vector")
encoded_df = encoder.transform(indexed_df)
encoded_df.drop("bar").show()
## +------+----+--------------+-------------+
## |gender| foo|gender_numeric|gender_vector|
## +------+----+--------------+-------------+
## | 0| 3.0| 0.0|(1,[0],[1.0])|
## | 1| 1.0| 1.0| (1,[],[])|
## | 1|-1.0| 1.0| (1,[],[])|
## | 0|-3.0| 0.0|(1,[0],[1.0])|
## +------+----+--------------+-------------+
VectorAssembler
:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
inputCols=["gender_vector", "bar", "foo"], outputCol="features")
encoded_df_with_indexed_bar = (vector_indexer
.fit(encoded_df)
.transform(encoded_df))
final_df = assembler.transform(encoded_df)
カテゴリ変数が含まれている場合は、必要なメタデータを設定するためにbar
使用できます。VectorIndexer
from pyspark.ml.feature import VectorIndexer
vector_indexer = VectorIndexer(inputCol="bar", outputCol="bar_indexed")
しかし、ここではそうではありません。
最後に、パイプラインを使用してすべてをラップできます。
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexer, encoder, vector_indexer, assembler])
model = pipeline.fit(df)
transformed = model.transform(df)
おそらく、すべてをゼロから作成するよりも、はるかに堅牢でクリーンなアプローチです。特に、異なるデータセット間で一貫したエンコードが必要な場合は、いくつかの注意事項があります。StringIndexer
詳細については、およびの公式ドキュメントを参照してVectorIndexer
ください。
同等の出力を取得する別の方法はRFormula
whichです。
RFormula
特徴のベクトル列と、ラベルの double または string 列を生成します。線形回帰のために R で数式が使用される場合と同様に、文字列入力列はワンホット エンコードされ、数値列は double にキャストされます。ラベル列が文字列型の場合、最初に で double に変換されStringIndexer
ます。ラベル列が DataFrame に存在しない場合、出力ラベル列は式で指定された応答変数から作成されます。
from pyspark.ml.feature import RFormula
rf = RFormula(formula="~ gender + bar + foo - 1")
final_df_rf = rf.fit(df).transform(df)
ご覧のとおり、はるかに簡潔ですが、構成が難しいため、あまりカスタマイズできません。それにもかかわらず、このような単純なパイプラインの結果は同じになります:
final_df_rf.select("features").show(4, False)
## +----------------------+
## |features |
## +----------------------+
## |[1.0,0.0,2.1,1.0,3.0] |
## |[0.0,0.0,1.1,1.0,1.0] |
## |(5,[2,4],[3.4,-1.0]) |
## |[1.0,0.0,4.1,0.0,-3.0]|
## +----------------------+
final_df.select("features").show(4, False)
## +----------------------+
## |features |
## +----------------------+
## |[1.0,0.0,2.1,1.0,3.0] |
## |[0.0,0.0,1.1,1.0,1.0] |
## |(5,[2,4],[3.4,-1.0]) |
## |[1.0,0.0,4.1,0.0,-3.0]|
## +----------------------+
ご質問について:
Spark SQLクエリで使用できる同様の機能を持つUDFを作成します(または他の方法だと思います)
これは、他のものと同じように単なる UDF です。すべてが正常に機能するように、サポートされているタイプを使用していることを確認してください。
上記のマップから得られた RDD を取得し、それを新しい列として user_data データフレームに追加しますか?
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.types import StructType, StructField
schema = StructType([StructField("features", VectorUDT(), True)])
row = Row("features")
result.map(lambda x: row(DenseVector(x))).toDF(schema)
注:
Spark 1.x の場合は に置き換えpyspark.ml.linalg
ますpyspark.mllib.linalg
。