[更新: Spark 2.2 以降、PCA と SVD の両方が PySpark で利用可能です - JIRA チケットSPARK-6227とSpark ML 2.2 のPCA & PCAModelを参照してください。以下の元の回答は、古い Spark バージョンにも適用できます。]
信じられないことのように思えますが、実際には、PCA 分解からそのような情報を抽出する方法はありません (少なくとも Spark 1.5 の時点では)。しかし、繰り返しますが、多くの同様の「苦情」がありました。たとえば、CrossValidatorModel
.
幸いなことに、数か月前に、 AMPLab (バークレー) と Databricks (つまり、Spark の作成者) による「スケーラブルな機械学習」 MOOC に参加し、宿題の一部として完全な PCA パイプラインを「手作業で」実装しました。私は当時から自分の関数を変更しました(安心してください、私は完全な信用を得ました:-)、データフレームを入力として(RDDの代わりに)、あなたのものと同じ形式(つまりDenseVectors
、数値機能を含む行)。
estimatedCovariance
まず、次のように中間関数 を定義する必要があります。
import numpy as np
def estimateCovariance(df):
"""Compute the covariance matrix for a given dataframe.
Note:
The multi-dimensional covariance array should be calculated using outer products. Don't
forget to normalize the data by first subtracting the mean.
Args:
df: A Spark dataframe with a column named 'features', which (column) consists of DenseVectors.
Returns:
np.ndarray: A multi-dimensional array where the number of rows and columns both equal the
length of the arrays in the input dataframe.
"""
m = df.select(df['features']).map(lambda x: x[0]).mean()
dfZeroMean = df.select(df['features']).map(lambda x: x[0]).map(lambda x: x-m) # subtract the mean
return dfZeroMean.map(lambda x: np.outer(x,x)).sum()/df.count()
次に、次のようにメインpca
関数を記述できます。
from numpy.linalg import eigh
def pca(df, k=2):
"""Computes the top `k` principal components, corresponding scores, and all eigenvalues.
Note:
All eigenvalues should be returned in sorted order (largest to smallest). `eigh` returns
each eigenvectors as a column. This function should also return eigenvectors as columns.
Args:
df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors.
k (int): The number of principal components to return.
Returns:
tuple of (np.ndarray, RDD of np.ndarray, np.ndarray): A tuple of (eigenvectors, `RDD` of
scores, eigenvalues). Eigenvectors is a multi-dimensional array where the number of
rows equals the length of the arrays in the input `RDD` and the number of columns equals
`k`. The `RDD` of scores has the same number of rows as `data` and consists of arrays
of length `k`. Eigenvalues is an array of length d (the number of features).
"""
cov = estimateCovariance(df)
col = cov.shape[1]
eigVals, eigVecs = eigh(cov)
inds = np.argsort(eigVals)
eigVecs = eigVecs.T[inds[-1:-(col+1):-1]]
components = eigVecs[0:k]
eigVals = eigVals[inds[-1:-(col+1):-1]] # sort eigenvals
score = df.select(df['features']).map(lambda x: x[0]).map(lambda x: np.dot(x, components.T) )
# Return the `k` principal components, `k` scores, and all eigenvalues
return components.T, score, eigVals
テスト
最初に、Spark ML PCAドキュメントのサンプル データを使用して、既存のメソッドの結果を見てみましょう( all になるように変更しますDenseVectors
)。
from pyspark.ml.feature import *
from pyspark.mllib.linalg import Vectors
data = [(Vectors.dense([0.0, 1.0, 0.0, 7.0, 0.0]),),
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = sqlContext.createDataFrame(data,["features"])
pca_extracted = PCA(k=2, inputCol="features", outputCol="pca_features")
model = pca_extracted.fit(df)
model.transform(df).collect()
[Row(features=DenseVector([0.0, 1.0, 0.0, 7.0, 0.0]), pca_features=DenseVector([1.6486, -4.0133])),
Row(features=DenseVector([2.0, 0.0, 3.0, 4.0, 5.0]), pca_features=DenseVector([-4.6451, -1.1168])),
Row(features=DenseVector([4.0, 0.0, 0.0, 6.0, 7.0]), pca_features=DenseVector([-6.4289, -5.338]))]
次に、私たちの方法で:
comp, score, eigVals = pca(df)
score.collect()
[array([ 1.64857282, 4.0132827 ]),
array([-4.64510433, 1.11679727]),
array([-6.42888054, 5.33795143])]
定義した関数でメソッドを使用していないことを強調させてください-は、そうあるべきです。collect()
score
RDD
2 番目の列の符号はすべて、既存の方法で導出されたものとは反対であることに注意してください。しかし、これは問題ではありません: (無料でダウンロードできる) An Introduction to Statistical Learning によると、Hastie & Tibshirani 共著、p. 382
各主成分負荷ベクトルは、符号反転まで一意です。これは、2 つの異なるソフトウェア パッケージが同じ主成分負荷ベクトルを生成することを意味しますが、これらの負荷ベクトルの符号は異なる場合があります。各主成分負荷ベクトルは p 次元空間で方向を指定するため、符号が異なる場合があります。方向が変わらないため、符号を反転しても効果はありません。[...] 同様に、Z の分散は −Z の分散と同じであるため、スコア ベクトルは符号反転まで一意です。
最後に、固有値が利用可能になったので、説明された分散のパーセンテージの関数を書くのは簡単です。
def varianceExplained(df, k=1):
"""Calculate the fraction of variance explained by the top `k` eigenvectors.
Args:
df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors.
k: The number of principal components to consider.
Returns:
float: A number between 0 and 1 representing the percentage of variance explained
by the top `k` eigenvectors.
"""
components, scores, eigenvalues = pca(df, k)
return sum(eigenvalues[0:k])/sum(eigenvalues)
varianceExplained(df,1)
# 0.79439325322305299
テストとして、例のデータで説明されている分散が k=5 の場合に 1.0 であるかどうかも確認します (元のデータは 5 次元であるため)。
varianceExplained(df,5)
# 1.0
[Spark 1.5.0 & 1.5.1 で開発およびテスト済み]