を使用してクラスタリングメカニズムを構築しようとしています
- Google Dataproc + Spark
- Google ビッグクエリ
- Spark ML KMeans+pipeline を使用してジョブを作成する
次のように:
bigquery でユーザー レベル ベースの機能テーブルを作成する
例: 機能テーブルはどのように表示されるかuserid |x1 |x2 |x3 |x4 |x5 |x6 |x7 |x8 |x9 |x10
00013 |0.01 | 0 |0 |0 |0 |0 |0 |0.06 |0.09 | 0.001- ここに示すように、デフォルト設定のクラスタを起動します。gcloud コマンドライン インターフェースを使用してクラスタを作成し、ジョブを実行します。
- 提供されたスターター コードを使用して、BQ テーブルを読み取り、RDD をデータフレームに変換して、KMeans モデル/パイプラインに渡します。
#!/usr/bin/python
"""BigQuery I/O PySpark example."""
import json
import pprint
import subprocess
import pyspark
import numpy as np
from pyspark.ml.clustering import KMeans
from pyspark import SparkContext
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext
from pyspark.mllib.linalg import Vectors, _convert_to_vector
from pyspark.sql.types import Row
from pyspark.mllib.common import callMLlibFunc, callJavaFunc, _py2java, _java2py
sc = pyspark.SparkContext()
# Use the Google Cloud Storage bucket for temporary BigQuery export data used by the InputFormat.
# This assumes the Google Cloud Storage connector for Hadoop is configured.
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory ='gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
conf = {# Input Parameters
'mapred.bq.project.id': project,
'mapred.bq.gcs.bucket': bucket,
'mapred.bq.temp.gcs.path': input_directory,
'mapred.bq.input.project.id': 'my-project',
'mapred.bq.input.dataset.id': 'tempData',
'mapred.bq.input.table.id': 'userFeatureInBQ'}
# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
'org.apache.hadoop.io.LongWritable',
'com.google.gson.JsonObject',conf=conf)
# Tranform the userid-Feature table into feature_data RDD
feature_data = (
table_data
.map(lambda (_, record): json.loads(record))
.map(lambda x:(x['x0'],x['x1'],x['x2'],x['x3'],x['x4'],
x['x5'],x['x6'],x['x7'],x['x8'],
x['x9'],x['x10'])))
# Function to convert each line in RDD into an array, return the vector
def parseVector(values):
array = np.array([float(v) for v in values])
return _convert_to_vector(array)
# Convert the RDD into a row wise RDD
data = feature_data.map(parseVector)
row_rdd = data.map(lambda x: Row(x))
sqlContext = SQLContext(sc)
# cache the RDD to improve performance
row_rdd.cache()
# Create a Dataframe
df = sqlContext.createDataFrame(row_rdd, ["features"])
# cache the Dataframe
df.cache()
コンソールに出力するスキーマと head() は次のとおりです。
|-- features: vector (nullable = true)
[Row(features=DenseVector([0.01,0,0,0,0,0,0,0.06,0.09,0.001]))]
- 次の方法でクラスタリング KMeans アルゴリズムを実行します。
- モデルを複数回実行する
- 異なるパラメーターを使用 (つまり、#clusters と init_mode を変更)
- エラーまたはコスト指標を計算する
- 最適なモデルとパラメーターの組み合わせを選択する
- KMeans を推定量としてパイプラインを作成する
- paramMap を使用して複数のパラメーターを渡す
#Define the paramMap & model
paramMap = ({'k':3,'initMode':'kmeans||'},{'k':3,'initMode':'random'},
{'k':4,'initMode':'kmeans||'},{'k':4,'initMode':'random'},
{'k':5,'initMode':'kmeans||'},{'k':5,'initMode':'random'},
{'k':6,'initMode':'kmeans||'},{'k':6,'initMode':'random'},
{'k':7,'initMode':'kmeans||'},{'k':7,'initMode':'random'},
{'k':8,'initMode':'kmeans||'},{'k':8,'initMode':'random'},
{'k':9,'initMode':'kmeans||'},{'k':9,'initMode':'random'},
{'k':10,'initMode':'kmeans||'},{'k':10,'initMode':'random'})
km = KMeans()
# Create a Pipeline with estimator stage
pipeline = Pipeline(stages=[km])
# Call & fit the pipeline with the paramMap
models = pipeline.fit(df, paramMap)`
print models
警告付きで次の出力が得られます
7:03:24 WARN org.apache.spark.mllib.clustering.KMeans: The input data was not directly cached, which may hurt performance if its parent RDDs are also uncached.
[PipelineModel_443dbf939b7bd3bf7bfc, PipelineModel_4b64bb761f4efe51da50, PipelineModel_4f858411ac19beacc1a4, PipelineModel_4f58b894f1d14d79b936, PipelineModel_4b8194f7a5e6be6eaf33, PipelineModel_4fc5b6370bff1b4d7dba, PipelineModel_43e0a196f16cfd3dae57, PipelineModel_47318a54000b6826b20e, PipelineModel_411bbe1c32db6bf0a92b, PipelineModel_421ea1364d8c4c9968c8, PipelineModel_4acf9cdbfda184b00328, PipelineModel_42d1a0c61c5e45cdb3cd, PipelineModel_4f0db3c394bcc2bb9352, PipelineModel_441697f2748328de251c, PipelineModel_4a64ae517d270a1e0d5a, PipelineModel_4372bc8db92b184c05b0]
#Print the cluster centers:
for model in models:
print vars(model)
print model.stages[0].clusterCenters()
print model.extractParamMap()
出力:
[array([7.64676638e-07, 3.58531391e-01, 1.68879698e-03, 0.00000000e+00, 1.53477043e-02, 1.25822915e-02, 0.00000000e+00, 6.93060772e-07, 1.41766847e-03, 1.60941306e-02], array([2.36494105e-06, 1.87719732e-02, 3.73829379e-03, 0.00000000e+00, 4.20724542e-02, 2.28675684e-02, 0.00000000e+00, 5.45002249e-06, 1.17331153e-02, 1.24364600e-02])
ここに質問のリストがあり、ヘルプが必要です。
- すべてのモデルの配列として 2 つのクラスター センターのみを含むリストを取得します。
- パイプラインにアクセスしようとすると、KMeans モデルがデフォルトで k=2 になっているようです。なぜこれが起こるのでしょうか?
- 最後のループは、pipelineModel と 0 番目のステージにアクセスし、clusterCenter() メソッドを実行することになっていますか? これは正しい方法ですか?
- データがキャッシュされていないというエラーが表示されるのはなぜですか?
- パイプラインを使用する場合、WSSSE または .computeCost()(mllib 用) のような同等のメソッドを計算する方法が見つかりませんでした。異なるパラメーターに基づいて異なるモデルを比較するにはどうすればよいですか?
- ソースコードhereで定義されているように、次のコードを実行して .computeCost メソッドを実行しようとしました。
- これは、パイプラインを使用して KMeans モデルとモデル選択を並行して実行する目的に反しますが、次のコードを試しました。
#computeError
def computeCost(model, rdd):`
"""Return the K-means cost (sum of squared distances of
points to their nearest center) for this model on the given data."""
cost = callMLlibFunc("computeCostKmeansModel",
rdd.map(_convert_to_vector),
[_convert_to_vector(c) for c in model.clusterCenters()])
return cost
cost= np.zeros(len(paramMap))
for i in range(len(paramMap)):
cost[i] = cost[i] + computeCost(model[i].stages[0], feature_data)
print cost
これにより、ループの最後に次のように出力されます。
[ 634035.00294687 634035.00294687 634035.00294687 634035.00294687
634035.00294687 634035.00294687 634035.00294687 634035.00294687
634035.00294687 634035.00294687 634035.00294687 634035.00294687
634035.00294687 634035.00294687 634035.00294687 634035.00294687]
- 計算されたコスト/エラーは各モデルで同じですか? この場合も、正しいパラメーターを使用して pipelineModel にアクセスできません。
どんな助け/指導も大歓迎です! ありがとう!