4

bigquery に読み込まれたデータ テーブルがあり、それを pyspark .py ファイル経由で Spark クラスタにインポートしたいと考えています。

Dataproc + BigQuery の例で見た- 利用可能なものはありますか? scala を使用して spark クラスターに bigquery テーブルをロードする方法がありましたが、pyspark スクリプトでそれを行う方法はありますか?

4

1 に答える 1

5

これは、この質問の@MattJから来ています。Spark で BigQuery に接続し、単語カウントを実行する例を次に示します。

import json
import pyspark
sc = pyspark.SparkContext()

hadoopConf=sc._jsc.hadoopConfiguration()
hadoopConf.get("fs.gs.system.bucket")

conf = {"mapred.bq.project.id": "<project_id>", "mapred.bq.gcs.bucket": "<bucket>",
    "mapred.bq.input.project.id": "publicdata", 
    "mapred.bq.input.dataset.id":"samples", 
    "mapred.bq.input.table.id": "shakespeare"  }

tableData = sc.newAPIHadoopRDD(
    "com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat",
    "org.apache.hadoop.io.LongWritable", "com.google.gson.JsonObject", 
    conf=conf).map(lambda k: json.loads(k[1])).map(lambda x: (x["word"],
    int(x["word_count"]))).reduceByKey(lambda x,y: x+y)

print tableData.take(10)

<project_id>プロジェクトの設定を変更し<bucket>て一致させる必要があります。

于 2015-10-29T18:09:06.713 に答える