15

数十万のオブジェクトを含むGAEデータストアの種類があります。いくつかの関連するクエリ(カウントクエリを含む)を実行したい。Big Queryは、これを行うのに最適なようです。

現在、Big Queryを使用してライブAppEngineデータストアにクエリを実行する簡単な方法はありますか?

4

6 に答える 6

17

データストアエンティティでBigQueryを直接実行することはできませんが、データストアからエンティティを読み取り、Google Cloud StorageのCSVに書き込んでから、それらをBigQueryに取り込むマッパーパイプラインを作成できます。プロセスを自動化することもできます。データストアからCSVへのステップだけに マッパーAPIクラスを使用する例を次に示します。

import re
import time
from datetime import datetime
import urllib
import httplib2
import pickle

from google.appengine.ext import blobstore
from google.appengine.ext import db
from google.appengine.ext import webapp

from google.appengine.ext.webapp.util import run_wsgi_app
from google.appengine.ext.webapp import blobstore_handlers
from google.appengine.ext.webapp import util
from google.appengine.ext.webapp import template

from mapreduce.lib import files
from google.appengine.api import taskqueue
from google.appengine.api import users

from mapreduce import base_handler
from mapreduce import mapreduce_pipeline
from mapreduce import operation as op

from apiclient.discovery import build
from google.appengine.api import memcache
from oauth2client.appengine import AppAssertionCredentials


#Number of shards to use in the Mapper pipeline
SHARDS = 20

# Name of the project's Google Cloud Storage Bucket
GS_BUCKET = 'your bucket'

# DataStore Model
class YourEntity(db.Expando):
  field1 = db.StringProperty() # etc, etc

ENTITY_KIND = 'main.YourEntity'


class MapReduceStart(webapp.RequestHandler):
  """Handler that provides link for user to start MapReduce pipeline.
  """
  def get(self):
    pipeline = IteratorPipeline(ENTITY_KIND)
    pipeline.start()
    path = pipeline.base_path + "/status?root=" + pipeline.pipeline_id
    logging.info('Redirecting to: %s' % path)
    self.redirect(path)


class IteratorPipeline(base_handler.PipelineBase):
  """ A pipeline that iterates through datastore
  """
  def run(self, entity_type):
    output = yield mapreduce_pipeline.MapperPipeline(
      "DataStore_to_Google_Storage_Pipeline",
      "main.datastore_map",
      "mapreduce.input_readers.DatastoreInputReader",
      output_writer_spec="mapreduce.output_writers.FileOutputWriter",
      params={
          "input_reader":{
              "entity_kind": entity_type,
              },
          "output_writer":{
              "filesystem": "gs",
              "gs_bucket_name": GS_BUCKET,
              "output_sharding":"none",
              }
          },
          shards=SHARDS)


def datastore_map(entity_type):
  props = GetPropsFor(entity_type)
  data = db.to_dict(entity_type)
  result = ','.join(['"%s"' % str(data.get(k)) for k in props])
  yield('%s\n' % result)


def GetPropsFor(entity_or_kind):
  if (isinstance(entity_or_kind, basestring)):
    kind = entity_or_kind
  else:
    kind = entity_or_kind.kind()
  cls = globals().get(kind)
  return cls.properties()


application = webapp.WSGIApplication(
                                     [('/start', MapReduceStart)],
                                     debug=True)

def main():
  run_wsgi_app(application)

if __name__ == "__main__":
  main()

これをIteratorPipelineクラスの最後に追加すると、次のようにyield CloudStorageToBigQuery(output)、結果のcsvファイルハンドルをBigQuery取り込みパイプにパイプできます...

class CloudStorageToBigQuery(base_handler.PipelineBase):
  """A Pipeline that kicks off a BigQuery ingestion job.
  """
  def run(self, output):

# BigQuery API Settings
SCOPE = 'https://www.googleapis.com/auth/bigquery'
PROJECT_ID = 'Some_ProjectXXXX'
DATASET_ID = 'Some_DATASET'

# Create a new API service for interacting with BigQuery
credentials = AppAssertionCredentials(scope=SCOPE)
http = credentials.authorize(httplib2.Http())
bigquery_service = build("bigquery", "v2", http=http)

jobs = bigquery_service.jobs()
table_name = 'datastore_dump_%s' % datetime.utcnow().strftime(
    '%m%d%Y_%H%M%S')
files = [str(f.replace('/gs/', 'gs://')) for f in output]
result = jobs.insert(projectId=PROJECT_ID,
                    body=build_job_data(table_name,files)).execute()
logging.info(result)

def build_job_data(table_name, files):
  return {"projectId": PROJECT_ID,
          "configuration":{
              "load": {
                  "sourceUris": files,
                  "schema":{
                      # put your schema here
                      "fields": fields
                      },
                  "destinationTable":{
                      "projectId": PROJECT_ID,
                      "datasetId": DATASET_ID,
                      "tableId": table_name,
                      },
                  }
              }
          }
于 2012-06-10T15:19:46.857 に答える
7

新しい(2013年9月以降の)ストリーミングインサートAPIを使用すると、アプリからBigQueryにレコードをインポートできます。

データはBigQueryですぐに利用できるため、ライブ要件を満たす必要があります。

この質問は少し古くなっていますが、これはこの質問に出くわした人にとってはより簡単な解決策かもしれません

現時点では、ローカル開発サーバーからこれを機能させることは、せいぜいパッチが当てられています。

于 2013-12-03T10:15:16.403 に答える
5

データストアからBigQueryに移行するためのTrustedTesterプログラムを2つの簡単な操作で実行しています。

  1. データストア管理者のバックアップ機能を使用してデータストアをバックアップする
  2. バックアップをBigQueryに直接インポートする

スキーマが自動的に処理されます。

詳細(適用):https ://docs.google.com/a/google.com/spreadsheet/viewform?formkey = dHdpeXlmRlZCNWlYSE9BcE5jc2NYOUE6MQ

于 2012-10-19T18:34:36.337 に答える
3

BigQueryの場合、それらの種類をCSVまたは区切りレコード構造にエクスポートし、BigQueryにロードすると、クエリを実行できます。ライブGAEデータストアにクエリを実行できる機能はありません。

Biqueryは分析クエリエンジンであり、レコードを変更できないことを意味します。更新または削除は許可されていません。追加のみが可能です。

于 2012-06-10T07:27:39.507 に答える
2

いいえ、BigQueryはデータをアップロードする必要がある別の製品です。データストアでは機能しません。GQLを使用してデータストアにクエリを実行できます。

于 2012-06-10T07:04:40.600 に答える
1

2016年現在、これは非常に可能です。次のことを行う必要があります。

  1. Googleストレージで新しいバケットを作成します
  2. console.developers.google.comのデータベース管理者を使用してエンティティをバックアップする完全なチュートリアルがあります
  3. bigquery Web UIに移動し、手順1で生成されたファイルをインポートします。

このワークフローの完全な例については、この投稿を参照してください。

于 2016-02-22T04:31:53.430 に答える