AppEngineデータストアで興味深い制限に遭遇しました。実稼働サーバーの1つで使用状況データを分析するのに役立つハンドラーを作成しています。分析を実行するには、データストアから取得した10,000以上のエンティティをクエリして要約する必要があります。計算は難しくありません。使用サンプルの特定のフィルターを通過するアイテムのヒストグラムにすぎません。私が直面した問題は、クエリの期限に達する前に処理を実行するのに十分な速度でデータストアからデータを取り戻すことができないことです。
パフォーマンスを向上させるために、クエリを並列RPC呼び出しにチャンク化するために考えられるすべてのことを試しましたが、appstatsによると、クエリを実際に並列で実行することはできないようです。私がどの方法を試しても(以下を参照)、RPCは常に次の連続クエリのウォーターフォールにフォールバックしているように見えます。
注:クエリと分析のコードは機能しますが、データストアから十分な速さでデータを取得できないため、実行速度が遅くなります。
バックグラウンド
共有できるライブバージョンはありませんが、これが私が話しているシステムの一部の基本モデルです。
class Session(ndb.Model):
""" A tracked user session. (customer account (company), version, OS, etc) """
data = ndb.JsonProperty(required = False, indexed = False)
class Sample(ndb.Model):
name = ndb.StringProperty (required = True, indexed = True)
session = ndb.KeyProperty (required = True, kind = Session)
timestamp = ndb.DateTimeProperty(required = True, indexed = True)
tags = ndb.StringProperty (repeated = True, indexed = True)
サンプルは、ユーザーが特定の名前の機能を利用するときと考えることができます。(例:'systemA.feature_x')。タグは、顧客の詳細、システム情報、および機能に基づいています。例:['winxp'、 '2.5.1'、'systemA'、'feature_x'、'premium_account'])。したがって、タグは、対象のサンプルを見つけるために使用できる非正規化されたトークンのセットを形成します。
私が行おうとしている分析は、日付範囲を取得し、顧客アカウント(ユーザーごとではなく会社)ごとに1日(または1時間)に使用される一連の機能(おそらくすべての機能)の機能が何回あったかを尋ねることで構成されます。
したがって、ハンドラーへの入力は次のようになります。
- 開始日
- 終了日
- タグ
出力は次のようになります。
[{
'company_account': <string>,
'counts': [
{'timeperiod': <iso8601 date>, 'count': <int>}, ...
]
}, ...
]
クエリの共通コード
すべてのクエリに共通するコードを次に示します。ハンドラーの一般的な構造は、webapp2を使用した単純なgetハンドラーであり、クエリパラメーターを設定し、クエリを実行し、結果を処理し、返すデータを作成します。
# -- Build Query Object --- #
query_opts = {}
query_opts['batch_size'] = 500 # Bring in large groups of entities
q = Sample.query()
q = q.order(Sample.timestamp)
# Tags
tag_args = [(Sample.tags == t) for t in tags]
q = q.filter(ndb.query.AND(*tag_args))
def handle_sample(sample):
session_obj = sample.session.get() # Usually found in local or memcache thanks to ndb
count_key = session_obj.data['customer']
addCountForPeriod(count_key, sample.timestamp)
試した方法
データストアからデータをできるだけ早く並行してプルするために、さまざまな方法を試しました。私がこれまでに試した方法は次のとおりです。
A.シングルイテレーション
これは、他の方法と比較するためのより単純な基本ケースです。クエリを作成し、すべてのアイテムを反復処理して、ndbが次々にそれらをプルするために行うことを実行できるようにします。
q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
q_iter = q.iter(**query_opts)
for sample in q_iter:
handle_sample(sample)
B.ラージフェッチ
ここでのアイデアは、1つの非常に大きなフェッチを実行できるかどうかを確認することでした。
q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
samples = q.fetch(20000, **query_opts)
for sample in samples:
handle_sample(sample)
C.時間範囲全体での非同期フェッチ
ここでの考え方は、サンプルが時間全体でかなり間隔が空いていることを認識することです。これにより、時間領域全体をチャンクに分割する一連の独立したクエリを作成し、非同期を使用してこれらのそれぞれを並列に実行しようとします。
# split up timestamp space into 20 equal parts and async query each of them
ts_delta = (end_time - start_time) / 20
cur_start_time = start_time
q_futures = []
for x in range(ts_intervals):
cur_end_time = (cur_start_time + ts_delta)
if x == (ts_intervals-1): # Last one has to cover full range
cur_end_time = end_time
f = q.filter(Sample.timestamp >= cur_start_time,
Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts)
q_futures.append(f)
cur_start_time = cur_end_time
# Now loop through and collect results
for f in q_futures:
samples = f.get_result()
for sample in samples:
handle_sample(sample)
D.非同期マッピング
このメソッドを試したのは、Query.map_asyncメソッドを使用すると、ndbが自動的に並列処理を利用する可能性があるように思われるためです。
q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
@ndb.tasklet
def process_sample(sample):
period_ts = getPeriodTimestamp(sample.timestamp)
session_obj = yield sample.session.get_async() # Lookup the session object from cache
count_key = session_obj.data['customer']
addCountForPeriod(count_key, sample.timestamp)
raise ndb.Return(None)
q_future = q.map_async(process_sample, **query_opts)
res = q_future.get_result()
結果
全体的な応答時間とappstatsトレースを収集するために、1つのクエリ例をテストしました。結果は次のとおりです。
A.シングルイテレーション
実数:15.645秒
これは、バッチを次々にフェッチしてから、memcacheからすべてのセッションを取得します。
B.ラージフェッチ
実数:12.12秒
オプションAと実質的に同じですが、何らかの理由で少し高速です。
C.時間範囲全体での非同期フェッチ
実数:15.251秒
最初はより多くの並列処理を提供するように見えますが、結果の反復中に次への一連の呼び出しによって速度が低下するようです。また、セッションmemcacheルックアップを保留中のクエリとオーバーラップさせることはできないようです。
D.非同期マッピング
実数:13.752秒
これは私が理解するのが最も難しいです。かなりの重なりがあるように見えますが、すべてが並列ではなく滝の中で伸びているようです。
推奨事項
これらすべてに基づいて、私は何が欠けていますか?App Engineの制限に達したばかりですか、それとも多数のエンティティを並行してプルダウンするためのより良い方法がありますか?
次に何をしようか迷っています。クライアントを書き直して、App Engineに複数のリクエストを並行して行うことを考えましたが、これはかなりブルートフォースのようです。App Engineがこのユースケースを処理できるはずなので、何か足りないものがあると思います。
アップデート
結局、私の場合はオプションCが最適であることがわかりました。6.1秒で完了するように最適化することができました。まだ完璧ではありませんが、はるかに優れています。
何人かの人からアドバイスを受けたところ、次の項目を理解し、覚えておくことが重要であることがわかりました。
- 複数のクエリを並行して実行できます
- 一度に飛行できるRPCは10台のみです。
- 二次クエリがない点まで非正規化してみてください
- このタイプのタスクは、リアルタイムクエリではなく、reduceキューとタスクキューをマップするために残しておくことをお勧めします
それで、私がそれをより速くするためにしたこと:
- クエリスペースを最初から時間に基づいて分割しました。(注:返されるエンティティに関してパーティションが等しいほど良い)
- データをさらに非正規化して、セカンダリセッションクエリの必要性を排除しました
- ndb非同期操作とwait_any()を使用して、クエリを処理とオーバーラップさせました
期待したり、期待したりするパフォーマンスがまだ得られていませんが、今のところは機能しています。ハンドラーで多数のシーケンシャルエンティティをメモリにすばやくプルするためのより良い方法であるといいのですが。