3

Hyperopt を Spark の MlLib に統合する良い例はありますか? 私は Databricks でそうしようとしてきましたが、同じエラーが引き続き発生します。これが私の目的関数の問題なのか、代わりに pyspark の Spark ML の問題なのか、Databricks にどのようにフックするのかはわかりません。

import itertools
from pyspark.sql import functions as f
from pyspark.sql import DataFrame
from pyspark.sql.types import *

from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import OneHotEncoder, Imputer, VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression, GBTClassifier
from pyspark.ml.classification import GBTClassificationModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
import numpy as np
from itertools import product
from hyperopt import fmin, hp, tpe, STATUS_OK, SparkTrials
import pandas as pd
from matplotlib import pyplot as plt
from sklearn.model_selection import train_test_split

search_space ={'maxDepth'   : hp.choice("maxDepth", np.arange(3, 8, dtype=int)),
        'maxIter'       : hp.uniform("maxIter", 200,800),
        'featureSubsetStrategy' : str(hp.quniform("featureSubsetStrategy", .5,1,.1)),
        'minInstancesPerNode' : hp.uniform("min_child_weight", 1,10),
        'stepSize'    : hp.loguniform('stepSize', np.log(0.01), np.log(0.1)),
        'subsamplingRate'    : hp.quniform("featureSubsetStrategy", .5,1,.1)   
    }
evaluator = BinaryClassificationEvaluator(labelCol="positive")

def train(params):
  gbtModel = GBTClassifier(labelCol="positive", featuresCol="features").fit(train)
  predictions_val = gbtModel.predict(val.map(lambda x: x.features))
  labelsAndPredictions = val.map(lambda lp: lp.label).zip(predictions_val)
  ROC = evaluator.evaluate(predictions_val, {evaluator.metricName: "areaUnderROC"})

  return {'ROC': ROC, 'status': STATUS_OK}



N_HYPEROPT_PROBES = 1000 #can increase, keep small for testing
EARLY_STOPPING = 50
HYPEROPT_ALGO = tpe.suggest
NB_CV_FOLDS = 5 # for testing, can increase

obj_call_count = 0
cur_best_score = 1000000
spark_trials = SparkTrials(parallelism=4)
best = fmin(fn=train,
             space=search_space,
              algo=HYPEROPT_ALGO,
                     max_evals=N_HYPEROPT_PROBES,
                     trials=spark_trials,
                     verbose=1) 

これを実行すると、次のエラーが表示されます。

Total Trials: 0: 0 succeeded, 0 failed, 0 cancelled. py4j.Py4JException: Method __getstate__([]) does not exist

4

1 に答える 1

1

これが遅すぎるかどうかはわかりませんが、SparkTrials は、scikit-learn ライブラリにあるような単一マシンの ML モデルでのみ機能します。Spark MLib の場合、Trials を使用する必要があります (trials パラメータを fmin 関数に渡す必要はありません)。

詳細については、http: //hyperopt.github.io/hyperopt/scaleout/spark/を参照してください。

SparkTrials は 1 つの Spark ワーカーで各モデルを適合させて評価するため、scikit-learn や単一マシンの TensorFlow など、単一マシンの ML モデルとワークフローの調整に限定されます。Apache Spark MLlib や Horovod などの分散 ML アルゴリズムの場合、Hyperopt のデフォルトの Trials クラスを使用できます。

于 2020-06-05T13:14:28.043 に答える