アプリケーションでボトルネックが発生しており、解決策を見つけるのに苦労しています。少し背景:
- 私のアプリは API に ping を実行して、何十万ものアイテムに関する情報を収集し、データストアに保存します
- これらのアイテムのディメンションの組み合わせで単純な集計を実行する必要があります。これは、アイテムを保存している間に試行および計算します。
現在の実装:
- 必要に応じてこれらのアイテムのダウンロードを手動で開始します。これにより、これらのアイテムのダウンロード専用のバックエンドにタスクが作成されます。各タスクは、ページ分割してすべてのアイテムを取得するために必要な API 呼び出しの数に応じて、より多くのタスクを起動します。
- 各タスクは、ディクショナリを使用して必要な集計をメモリに保持しながら、アイテムをダウンロード、解析、および一括保存します。
- 各タスクの実行の最後に、集計のディクショナリをプル キューに書き込みます。
- API 呼び出しの終わりに近づいていることを検出したら、2 番目のバックエンド構成への集約タスクを開始します。
- この「集約タスク」は、プル キューからプルし (一度に 20 個)、各タスクで見つかった辞書をマージし (さらにメモリ集約で実行します)、各集約を格納しようとします。このタスクは、プル キュー内の残りのタスク (数百) の集計を実行するために、他のタスクも起動します。
- シャード カウンターアプローチを使用して、データストアに保存する際の競合を軽減します。
- 各集計タスクは、500 ~ 1500 の集計を試して保存できますが、これらはすべて互いに独立している必要があります。
そこには、すべてのプル キュー タスクが適切に処理され、すべてのアイテムがダウンロードされていることを確認するための追加のチェックなどが含まれています。
問題:
すべてのアイテムと集計をできるだけ早くダウンロードして保存したいと考えています。説明したバックエンド構成ごとに 20 個のインスタンスを有効にしています (これらを「アグリゲーター」バックエンドおよび「ダウンローダー」バックエンドと呼びます)。ダウンローダのバックエンドは、API 呼び出しをかなり高速に処理しているようです。これを取得するために、NDB ライブラリと非同期 URL フェッチ/データストア呼び出しを多用しています。また、次のタスクを開始する前に RPC 呼び出しが完了するのを待機するインスタンスがないように、threadsafe:true を有効にしました (すべてのタスクは互いに独立して操作でき、冪等です)。
アグリゲーターのバックエンドは、ビッグ タイム シンクが機能する場所です。これらの集計を 500 ~ 1500 個トランザクションを通じて非同期に保存するには、40 秒以上かかります (すべてのトランザクションが適切にコミットされているとは思えません)。300 秒のプル キューの有効期限を使用するため、このバックエンドを threadsafe:false で保持しますが、1 つのインスタンスで複数のタスクを実行できるようにすると、それらがカスケード ダウンし、300 秒を超えて一部のタスクの完了をプッシュする可能性があります。マークを付けると、別のタスクが同じタスクを 2 度目にプルできるようになり、二重カウントが発生する可能性があります。
ログにはBadRequestError: Nested transactions are not supported.
、(スタック トレース内の) の以前のエラーが表示されますTransactionFailedError: too much contention on these datastore entities. please try again.
。私がよく見る別のエラーはBadRequestError(The referenced transaction has expired or is no longer valid.)
私の理解では、これらのエラーは、それ以上操作しなくてもトランザクションをコミットできることを意味する場合があります。これが適切にコミットされているかどうかはどうすればわかりますか? 私はこれを論理的/効率的な方法で行っていますか?それとも、すべてを台無しにするリスクなしに同時実行の余地がありますか?
関連コード:
class GeneralShardConfig(ndb.Model):
"""Tracks the number of shards for each named counter."""
name = ndb.StringProperty(required=True)
num_shards = ndb.IntegerProperty(default=4)
class GeneralAggregateShard(ndb.Model):
"""Shards for each named counter"""
name = ndb.StringProperty(name='n', required=True)
count = ndb.FloatProperty(name='c', default=0.00) #acts as a total now
@ndb.tasklet
def increment_batch(data_set):
def run_txn(name, value):
@ndb.tasklet
def txn():
to_put = []
dbkey = ndb.Key(GeneralShardConfig, name)
config = yield dbkey.get_async(use_memcache=False)
if not config:
config = GeneralShardConfig(key=dbkey,name=name)
to_put.append(config)
index = random.randint(0, config.num_shards-1)
shard_name = name + str(index)
dbkey = ndb.Key(GeneralAggregateShard, shard_name)
counter = yield dbkey.get_async()
if not counter:
counter = GeneralAggregateShard(key=dbkey, name=name)
counter.count += value
to_put.append(counter)
yield ndb.put_multi_async(to_put)
return ndb.transaction_async(txn, use_memcache=False, xg=True)
res = yield[run_txn(key, value) for key, value in data_set.iteritems() if value != 0.00]
raise ndb.Return(res)
実装を考えると、「競合」の唯一の余地は、2 つ以上の集約タスクが同じ集約名を更新する必要がある場合です。これはあまり頻繁に発生するべきではありません。 、 発生する。BadRequestError(The referenced transaction has expired or is no longer valid.)
イベントループがすべてのタスクレットのステータスをチェックし、終了したトランザクションへの参照にヒットすると、
エラーが表示されると思います。ここでの問題は、エラーが発生したことです。これは、すべてのトランザクションが途中で切断されたことを意味しますか、それともすべてのトランザクションが完了したと想定できますか? さらにres = yield[run_txn(key, value) for key, value in data_set.iteritems() if value != 0.00]
、これらのエラーを検出するには、この行を各タスクレットの try/except に分割する必要があると思います。
これに腹を立てる前に、このプロセスを最適化し、信頼できる方法で行う方法についてのガイダンス/ヘルプをいただければ幸いです。
編集 1: アグリゲーター タスクの動作を次のように変更しました。
- 複数のタスクがキューからリースされた場合、タスクをメモリに集約し、その結果をプル キュー内の別のタスクに格納し、すぐに別の「集約タスク」を起動します。
- または、1 つのタスクがリースされた場合は、結果を保存してみてください
これにより、これまで目にしてきた競合エラーを減らすことができましたが、まだ信頼性は高くありません。ごく最近、BadRequestError: Nested transactions are not supported.
スタックトレースでヒットしましたRuntimeError: Deadlock waiting for <Future fbf0db50 created by transaction_async(model.py:3345) for tasklet transaction(context.py:806) suspended generator transaction(context.py:876); pending>
私は、この変更により、集約プロセスで発生する可能性のあるすべてのオーバーラップを組み合わせて単一のインスタンスで一度に試行できるようにすることで、プロセスを最適化する必要があると考えています。結果を信頼できる方法で保存するには、まだ問題があります。