数十万のオブジェクトを含むGAEデータストアの種類があります。いくつかの関連するクエリ(カウントクエリを含む)を実行したい。Big Queryは、これを行うのに最適なようです。
現在、Big Queryを使用してライブAppEngineデータストアにクエリを実行する簡単な方法はありますか?
数十万のオブジェクトを含むGAEデータストアの種類があります。いくつかの関連するクエリ(カウントクエリを含む)を実行したい。Big Queryは、これを行うのに最適なようです。
現在、Big Queryを使用してライブAppEngineデータストアにクエリを実行する簡単な方法はありますか?
データストアエンティティでBigQueryを直接実行することはできませんが、データストアからエンティティを読み取り、Google Cloud StorageのCSVに書き込んでから、それらをBigQueryに取り込むマッパーパイプラインを作成できます。プロセスを自動化することもできます。データストアからCSVへのステップだけに マッパーAPIクラスを使用する例を次に示します。
import re
import time
from datetime import datetime
import urllib
import httplib2
import pickle
from google.appengine.ext import blobstore
from google.appengine.ext import db
from google.appengine.ext import webapp
from google.appengine.ext.webapp.util import run_wsgi_app
from google.appengine.ext.webapp import blobstore_handlers
from google.appengine.ext.webapp import util
from google.appengine.ext.webapp import template
from mapreduce.lib import files
from google.appengine.api import taskqueue
from google.appengine.api import users
from mapreduce import base_handler
from mapreduce import mapreduce_pipeline
from mapreduce import operation as op
from apiclient.discovery import build
from google.appengine.api import memcache
from oauth2client.appengine import AppAssertionCredentials
#Number of shards to use in the Mapper pipeline
SHARDS = 20
# Name of the project's Google Cloud Storage Bucket
GS_BUCKET = 'your bucket'
# DataStore Model
class YourEntity(db.Expando):
field1 = db.StringProperty() # etc, etc
ENTITY_KIND = 'main.YourEntity'
class MapReduceStart(webapp.RequestHandler):
"""Handler that provides link for user to start MapReduce pipeline.
"""
def get(self):
pipeline = IteratorPipeline(ENTITY_KIND)
pipeline.start()
path = pipeline.base_path + "/status?root=" + pipeline.pipeline_id
logging.info('Redirecting to: %s' % path)
self.redirect(path)
class IteratorPipeline(base_handler.PipelineBase):
""" A pipeline that iterates through datastore
"""
def run(self, entity_type):
output = yield mapreduce_pipeline.MapperPipeline(
"DataStore_to_Google_Storage_Pipeline",
"main.datastore_map",
"mapreduce.input_readers.DatastoreInputReader",
output_writer_spec="mapreduce.output_writers.FileOutputWriter",
params={
"input_reader":{
"entity_kind": entity_type,
},
"output_writer":{
"filesystem": "gs",
"gs_bucket_name": GS_BUCKET,
"output_sharding":"none",
}
},
shards=SHARDS)
def datastore_map(entity_type):
props = GetPropsFor(entity_type)
data = db.to_dict(entity_type)
result = ','.join(['"%s"' % str(data.get(k)) for k in props])
yield('%s\n' % result)
def GetPropsFor(entity_or_kind):
if (isinstance(entity_or_kind, basestring)):
kind = entity_or_kind
else:
kind = entity_or_kind.kind()
cls = globals().get(kind)
return cls.properties()
application = webapp.WSGIApplication(
[('/start', MapReduceStart)],
debug=True)
def main():
run_wsgi_app(application)
if __name__ == "__main__":
main()
これをIteratorPipelineクラスの最後に追加すると、次のようにyield CloudStorageToBigQuery(output)
、結果のcsvファイルハンドルをBigQuery取り込みパイプにパイプできます...
class CloudStorageToBigQuery(base_handler.PipelineBase):
"""A Pipeline that kicks off a BigQuery ingestion job.
"""
def run(self, output):
# BigQuery API Settings
SCOPE = 'https://www.googleapis.com/auth/bigquery'
PROJECT_ID = 'Some_ProjectXXXX'
DATASET_ID = 'Some_DATASET'
# Create a new API service for interacting with BigQuery
credentials = AppAssertionCredentials(scope=SCOPE)
http = credentials.authorize(httplib2.Http())
bigquery_service = build("bigquery", "v2", http=http)
jobs = bigquery_service.jobs()
table_name = 'datastore_dump_%s' % datetime.utcnow().strftime(
'%m%d%Y_%H%M%S')
files = [str(f.replace('/gs/', 'gs://')) for f in output]
result = jobs.insert(projectId=PROJECT_ID,
body=build_job_data(table_name,files)).execute()
logging.info(result)
def build_job_data(table_name, files):
return {"projectId": PROJECT_ID,
"configuration":{
"load": {
"sourceUris": files,
"schema":{
# put your schema here
"fields": fields
},
"destinationTable":{
"projectId": PROJECT_ID,
"datasetId": DATASET_ID,
"tableId": table_name,
},
}
}
}
新しい(2013年9月以降の)ストリーミングインサートAPIを使用すると、アプリからBigQueryにレコードをインポートできます。
データはBigQueryですぐに利用できるため、ライブ要件を満たす必要があります。
この質問は少し古くなっていますが、これはこの質問に出くわした人にとってはより簡単な解決策かもしれません
現時点では、ローカル開発サーバーからこれを機能させることは、せいぜいパッチが当てられています。
データストアからBigQueryに移行するためのTrustedTesterプログラムを2つの簡単な操作で実行しています。
スキーマが自動的に処理されます。
BigQueryの場合、それらの種類をCSVまたは区切りレコード構造にエクスポートし、BigQueryにロードすると、クエリを実行できます。ライブGAEデータストアにクエリを実行できる機能はありません。
Biqueryは分析クエリエンジンであり、レコードを変更できないことを意味します。更新または削除は許可されていません。追加のみが可能です。
いいえ、BigQueryはデータをアップロードする必要がある別の製品です。データストアでは機能しません。GQLを使用してデータストアにクエリを実行できます。
2016年現在、これは非常に可能です。次のことを行う必要があります。
このワークフローの完全な例については、この投稿を参照してください。