私が知る限り、これは現在のバージョン (1.2.1) ではサポートされていません。ネイティブ Scala コード (tree.py) に対する Python ラッパーは、それぞれの Scala 対応物 (treeEnsembleModels.scala) を呼び出す「予測」関数のみを定義します。後者は、バイナリ決定の間で投票を行うことによって決定を下します。よりクリーンな解決策は、任意にしきい値を設定したり、sklearn のように ROC 計算に使用したりできる確率的予測を提供することでした。この機能は、将来のリリースで追加する必要があります。
回避策として、predict_proba を純粋な Python 関数として実装しました (以下の例を参照)。フォレスト内の個々の決定木のセットに対してループを実行するため、エレガントでも非常に効率的でもありません。その秘訣 (というか汚いハック) は、Java デシジョン ツリー モデルの配列にアクセスし、それらを対応する Python モデルにキャストすることです。その後、データセット全体で個々のモデルの予測を計算し、「zip」を使用して RDD に合計を蓄積できます。木の数で割ると、望ましい結果が得られます。大規模なデータセットの場合、マスター ノード内の少数の決定木に対するループは許容されます。
以下のコードは、Python を Spark (Java で実行) に統合するのが難しいため、ややこしいものになっています。複雑なデータをワーカー ノードに送信しないように十分に注意する必要があります。これにより、シリアライゼーションの問題によるクラッシュが発生します。Spark コンテキストを参照するコードをワーカー ノードで実行することはできません。また、Java コードを参照するコードはシリアライズできません。たとえば、以下のコードでは、ntrees の代わりに len(trees) を使用したくなるかもしれません。Java/Scala でこのようなラッパーを作成すると、たとえば、ワーカー ノードで決定木に対してループを実行して通信コストを削減することで、はるかに洗練されたものにすることができます。
以下のテスト関数は、predict_proba が元の例で使用された予測と同じテスト エラーを与えることを示しています。
def predict_proba(rf_model, data):
'''
This wrapper overcomes the "binary" nature of predictions in the native
RandomForestModel.
'''
# Collect the individual decision tree models by calling the underlying
# Java model. These are returned as JavaArray defined by py4j.
trees = rf_model._java_model.trees()
ntrees = rf_model.numTrees()
scores = DecisionTreeModel(trees[0]).predict(data.map(lambda x: x.features))
# For each decision tree, apply its prediction to the entire dataset and
# accumulate the results using 'zip'.
for i in range(1,ntrees):
dtm = DecisionTreeModel(trees[i])
scores = scores.zip(dtm.predict(data.map(lambda x: x.features)))
scores = scores.map(lambda x: x[0] + x[1])
# Divide the accumulated scores over the number of trees
return scores.map(lambda x: x/ntrees)
def testError(lap):
testErr = lap.filter(lambda (v, p): v != p).count() / float(testData.count())
print('Test Error = ' + str(testErr))
def testClassification(trainingData, testData):
model = RandomForest.trainClassifier(trainingData, numClasses=2,
categoricalFeaturesInfo={},
numTrees=50, maxDepth=30)
# Compute test error by thresholding probabilistic predictions
threshold = 0.5
scores = predict_proba(model,testData)
pred = scores.map(lambda x: 0 if x < threshold else 1)
lab_pred = testData.map(lambda lp: lp.label).zip(pred)
testError(lab_pred)
# Compute test error by comparing binary predictions
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testError(labelsAndPredictions)
全体として、これは Spark を学ぶための良い練習になりました。