62

AppEngineデータストアで興味深い制限に遭遇しました。実稼働サーバーの1つで使用状況データを分析するのに役立つハンドラーを作成しています。分析を実行するには、データストアから取得した10,000以上のエンティティをクエリして要約する必要があります。計算は難しくありません。使用サンプルの特定のフィルターを通過するアイテムのヒストグラムにすぎません。私が直面した問題は、クエリの期限に達する前に処理を実行するのに十分な速度でデータストアからデータを取り戻すことができないことです。

パフォーマンスを向上させるために、クエリを並列RPC呼び出しにチャンク化するために考えられるすべてのことを試しましたが、appstatsによると、クエリを実際に並列で実行することはできないようです。私がどの方法を試しても(以下を参照)、RPCは常に次の連続クエリのウォーターフォールにフォールバックしているように見えます。

注:クエリと分析のコードは機能しますが、データストアから十分な速さでデータを取得できないため、実行速度が遅くなります。

バックグラウンド

共有できるライブバージョンはありませんが、これが私が話しているシステムの一部の基本モデルです。

class Session(ndb.Model):
   """ A tracked user session. (customer account (company), version, OS, etc) """
   data = ndb.JsonProperty(required = False, indexed = False)

class Sample(ndb.Model):
   name      = ndb.StringProperty  (required = True,  indexed = True)
   session   = ndb.KeyProperty     (required = True,  kind = Session)
   timestamp = ndb.DateTimeProperty(required = True,  indexed = True)
   tags      = ndb.StringProperty  (repeated = True,  indexed = True)

サンプルは、ユーザーが特定の名前の機能を利用するときと考えることができます。(例:'systemA.feature_x')。タグは、顧客の詳細、システム情報、および機能に基づいています。例:['winxp'、 '2.5.1'、'systemA'、'feature_x'、'premium_account'])。したがって、タグは、対象のサンプルを見つけるために使用できる非正規化されたトークンのセットを形成します。

私が行おうとしている分析は、日付範囲を取得し、顧客アカウント(ユーザーごとではなく会社)ごとに1日(または1時間)に使用される一連の機能(おそらくすべての機能)の機能が何回あったかを尋ねることで構成されます。

したがって、ハンドラーへの入力は次のようになります。

  • 開始日
  • 終了日
  • タグ

出力は次のようになります。

[{
   'company_account': <string>,
   'counts': [
      {'timeperiod': <iso8601 date>, 'count': <int>}, ...
   ]
 }, ...
]

クエリの共通コード

すべてのクエリに共通するコードを次に示します。ハンドラーの一般的な構造は、webapp2を使用した単純なgetハンドラーであり、クエリパラメーターを設定し、クエリを実行し、結果を処理し、返すデータを作成します。

# -- Build Query Object --- #
query_opts = {}
query_opts['batch_size'] = 500   # Bring in large groups of entities

q = Sample.query()
q = q.order(Sample.timestamp)

# Tags
tag_args = [(Sample.tags == t) for t in tags]
q = q.filter(ndb.query.AND(*tag_args))

def handle_sample(sample):
   session_obj = sample.session.get()    # Usually found in local or memcache thanks to ndb
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)

試した方法

データストアからデータをできるだけ早く並行してプルするために、さまざまな方法を試しました。私がこれまでに試した方法は次のとおりです。

A.シングルイテレーション

これは、他の方法と比較するためのより単純な基本ケースです。クエリを作成し、すべてのアイテムを反復処理して、ndbが次々にそれらをプルするために行うことを実行できるようにします。

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
q_iter = q.iter(**query_opts)

for sample in q_iter:
   handle_sample(sample)

B.ラージフェッチ

ここでのアイデアは、1つの非常に大きなフェッチを実行できるかどうかを確認することでした。

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
samples = q.fetch(20000, **query_opts)

for sample in samples:
   handle_sample(sample)

C.時間範囲全体での非同期フェッチ

ここでの考え方は、サンプルが時間全体でかなり間隔が空いていることを認識することです。これにより、時間領域全体をチャンクに分割する一連の独立したクエリを作成し、非同期を使用してこれらのそれぞれを並列に実行しようとします。

# split up timestamp space into 20 equal parts and async query each of them
ts_delta       = (end_time - start_time) / 20
cur_start_time = start_time
q_futures = []

for x in range(ts_intervals):
   cur_end_time = (cur_start_time + ts_delta)
   if x == (ts_intervals-1):    # Last one has to cover full range
      cur_end_time = end_time

   f = q.filter(Sample.timestamp >= cur_start_time,
                Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts)
   q_futures.append(f)
   cur_start_time = cur_end_time

# Now loop through and collect results
for f in q_futures:
   samples = f.get_result()
   for sample in samples:
      handle_sample(sample)

D.非同期マッピング

このメソッドを試したのは、Query.map_asyncメソッドを使用すると、ndbが自動的に並列処理を利用する可能性があるように思われるためです。

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)

@ndb.tasklet
def process_sample(sample):
   period_ts   = getPeriodTimestamp(sample.timestamp)
   session_obj = yield sample.session.get_async()    # Lookup the session object from cache
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)
   raise ndb.Return(None)

q_future = q.map_async(process_sample, **query_opts)
res = q_future.get_result()

結果

全体的な応答時間とappstatsトレースを収集するために、1つのクエリ例をテストしました。結果は次のとおりです。

A.シングルイテレーション

実数:15.645秒

これは、バッチを次々にフェッチしてから、memcacheからすべてのセッションを取得します。

方法Aのappstats

B.ラージフェッチ

実数:12.12秒

オプションAと実質的に同じですが、何らかの理由で少し高速です。

方法Bのappstats

C.時間範囲全体での非同期フェッチ

実数:15.251秒

最初はより多くの並列処理を提供するように見えますが、結果の反復中に次への一連の呼び出しによって速度が低下するようです。また、セッションmemcacheルックアップを保留中のクエリとオーバーラップさせることはできないようです。

方法Cのappstats

D.非同期マッピング

実数:13.752秒

これは私が理解するのが最も難しいです。かなりの重なりがあるように見えますが、すべてが並列ではなく滝の中で伸びているようです。

方法Dのappstats

推奨事項

これらすべてに基づいて、私は何が欠けていますか?App Engineの制限に達したばかりですか、それとも多数のエンティティを並行してプルダウンするためのより良い方法がありますか?

次に何をしようか迷っています。クライアントを書き直して、App Engineに複数のリクエストを並行して行うことを考えましたが、これはかなりブルートフォースのようです。App Engineがこのユースケースを処理できるはずなので、何か足りないものがあると思います。

アップデート

結局、私の場合はオプションCが最適であることがわかりました。6.1秒で完了するように最適化することができました。まだ完璧ではありませんが、はるかに優れています。

何人かの人からアドバイスを受けたところ、次の項目を理解し、覚えておくことが重要であることがわかりました。

  • 複数のクエリを並行して実行できます
  • 一度に飛行できるRPCは10台のみです。
  • 二次クエリがない点まで非正規化してみてください
  • このタイプのタスクは、リアルタイムクエリではなく、reduceキューとタスクキューをマップするために残しておくことをお勧めします

それで、私がそれをより速くするためにしたこと:

  • クエリスペースを最初から時間に基づいて分割しました。(注:返されるエンティティに関してパーティションが等しいほど良い)
  • データをさらに非正規化して、セカンダリセッションクエリの必要性を排除しました
  • ndb非同期操作とwait_any()を使用して、クエリを処理とオーバーラップさせました

期待したり、期待したりするパフォーマンスがまだ得られていませんが、今のところは機能しています。ハンドラーで多数のシーケンシャルエンティティをメモリにすばやくプルするためのより良い方法であるといいのですが。

4

4 に答える 4

8

このような大規模な処理は、60 秒の時間制限があるユーザー リクエストでは実行しないでください。代わりに、実行時間の長いリクエストをサポートするコンテキストで実行する必要があります。タスク キューは最大 10 分のリクエストをサポートし、(私が信じている) 通常のメモリ制限 (デフォルトの F1 インスタンスには128MB のメモリがあります) をサポートします。さらに高い制限 (リクエスト タイムアウトなし、1 GB 以上のメモリ) の場合は、backendsを使用します。

アクセス時にタスク キュー タスクを起動する URL を設定します。タスク キュー タスクがまだ完了している場合に true/false で応答する別の URL に ~5 秒ごとにポーリングする Web ページを返します。タスク キューはデータを処理します。これには数十秒かかる場合があり、計算されたデータまたはレンダリングされた Web ページとして結果をデータストアに保存します。最初のページが完了したことを検出すると、ユーザーはそのページにリダイレクトされ、現在計算されている結果がデータストアから取得されます。

于 2012-07-17T07:50:46.403 に答える
0

App Engine での大規模なデータ操作は、ある種の mapreduce 操作を使用して実装するのが最適です。

プロセスを説明するビデオを次に示しますが、BigQuery も含まれてい ます https://developers.google.com/events/io/sessions/gooio2012/307/

BigQuery が必要なようには思えませんが、おそらくパイプラインの Map 部分と Reduce 部分の両方を使用する必要があります。

あなたが行っていることと mapreduce の状況との主な違いは、1 つのインスタンスを起動してクエリを反復処理していることです。mapreduce では、クエリごとに個別のインスタンスを並行して実行します。すべてのデータを「合計」し、結果をどこかに書き込むには、reduce 操作が必要になります。

もう1つの問題は、カーソルを使用して反復する必要があることです。 https://developers.google.com/appengine/docs/java/datastore/queries#Query_Cursors

イテレータがクエリ オフセットを使用している場合、オフセットは同じクエリを発行し、多数の結果をスキップして次のセットを提供するのに対し、カーソルは次のセットに直接ジャンプするため、非効率的です。

于 2012-07-16T17:43:52.227 に答える