13

アプリケーションでボトルネックが発生しており、解決策を見つけるのに苦労しています。少し背景:

  • 私のアプリは API に ping を実行して、何十万ものアイテムに関する情報を収集し、データストアに保存します
  • これらのアイテムのディメンションの組み合わせで単純な集計を実行する必要があります。これは、アイテムを保存している間に試行および計算します。

現在の実装:

  • 必要に応じてこれらのアイテムのダウンロードを手動で開始します。これにより、これらのアイテムのダウンロード専用のバックエンドにタスクが作成されます。各タスクは、ページ分割してすべてのアイテムを取得するために必要な API 呼び出しの数に応じて、より多くのタスクを起動します。
  • 各タスクは、ディクショナリを使用して必要な集計をメモリに保持しながら、アイテムをダウンロード、解析、および一括保存します。
  • 各タスクの実行の最後に、集計のディクショナリをプル キューに書き込みます。
  • API 呼び出しの終わりに近づいていることを検出したら、2 番目のバックエンド構成への集約タスクを開始します。
  • この「集約タスク」は、プル キューからプルし (一度に 20 個)、各タスクで見つかった辞書をマージし (さらにメモリ集約で実行します)、各集約を格納しようとします。このタスクは、プル キュー内の残りのタスク (数百) の集計を実行するために、他のタスクも起動します。
  • シャード カウンターアプローチを使用して、データストアに保存する際の競合を軽減します。
  • 各集計タスクは、500 ~ 1500 の集計を試して保存できますが、これらはすべて互いに独立している必要があります。

そこには、すべてのプル キュー タスクが適切に処理され、すべてのアイテムがダウンロードされていることを確認するための追加のチェックなどが含まれています。

問題:

すべてのアイテムと集計をできるだけ早くダウンロードして保存したいと考えています。説明したバックエンド構成ごとに 20 個のインスタンスを有効にしています (これらを「アグリゲーター」バックエンドおよび「ダウンローダー」バックエンドと呼びます)。ダウンローダのバックエンドは、API 呼び出しをかなり高速に処理しているようです。これを取得するために、NDB ライブラリと非同期 URL フェッチ/データストア呼び出しを多用しています。また、次のタスクを開始する前に RPC 呼び出しが完了するのを待機するインスタンスがないように、threadsafe:true を有効にしました (すべてのタスクは互いに独立して操作でき、冪等です)。

アグリゲーターのバックエンドは、ビッグ タイム シンクが機能する場所です。これらの集計を 500 ~ 1500 個トランザクションを通じて非同期に保存するには、40 秒以上かかります (すべてのトランザクションが適切にコミットされているとは思えません)。300 秒のプル キューの有効期限を使用するため、このバックエンドを threadsafe:false で保持しますが、1 つのインスタンスで複数のタスクを実行できるようにすると、それらがカスケード ダウンし、300 秒を超えて一部のタスクの完了をプッシュする可能性があります。マークを付けると、別のタスクが同じタスクを 2 度目にプルできるようになり、二重カウントが発生する可能性があります。

ログにはBadRequestError: Nested transactions are not supported.、(スタック トレース内の) の以前のエラーが表示されますTransactionFailedError: too much contention on these datastore entities. please try again.。私がよく見る別のエラーはBadRequestError(The referenced transaction has expired or is no longer valid.)

私の理解では、これらのエラーは、それ以上操作しなくてもトランザクションをコミットできることを意味する場合があります。これが適切にコミットされているかどうかはどうすればわかりますか? 私はこれを論理的/効率的な方法で行っていますか?それとも、すべてを台無しにするリスクなしに同時実行の余地がありますか?

関連コード:

class GeneralShardConfig(ndb.Model):
    """Tracks the number of shards for each named counter."""
    name = ndb.StringProperty(required=True)
    num_shards = ndb.IntegerProperty(default=4)

class GeneralAggregateShard(ndb.Model):
    """Shards for each named counter"""
    name = ndb.StringProperty(name='n', required=True)
    count = ndb.FloatProperty(name='c', default=0.00) #acts as a total now

@ndb.tasklet
def increment_batch(data_set):
    def run_txn(name, value):
        @ndb.tasklet
        def txn():
            to_put = []
            dbkey = ndb.Key(GeneralShardConfig, name)
            config = yield dbkey.get_async(use_memcache=False)
            if not config:
                config = GeneralShardConfig(key=dbkey,name=name)
                to_put.append(config)
            index = random.randint(0, config.num_shards-1)
            shard_name =  name + str(index)
            dbkey = ndb.Key(GeneralAggregateShard, shard_name)
            counter = yield dbkey.get_async()
            if not counter:
                counter = GeneralAggregateShard(key=dbkey, name=name)
            counter.count += value
            to_put.append(counter)
            yield ndb.put_multi_async(to_put)
        return ndb.transaction_async(txn, use_memcache=False, xg=True)
    res = yield[run_txn(key, value) for key, value in data_set.iteritems() if value != 0.00]
    raise ndb.Return(res)

実装を考えると、「競合」の唯一の余地は、2 つ以上の集約タスクが同じ集約名を更新する必要がある場合です。これはあまり頻繁に発生するべきではありません。 、 発生する。BadRequestError(The referenced transaction has expired or is no longer valid.)イベントループがすべてのタスクレットのステータスをチェックし、終了したトランザクションへの参照にヒットすると、 エラーが表示されると思います。ここでの問題は、エラーが発生したことです。これは、すべてのトランザクションが途中で切断されたことを意味しますか、それともすべてのトランザクションが完了したと想定できますか? さらにres = yield[run_txn(key, value) for key, value in data_set.iteritems() if value != 0.00]、これらのエラーを検出するには、この行を各タスクレットの try/except に分割する必要があると思います。

これに腹を立てる前に、このプロセスを最適化し、信頼できる方法で行う方法についてのガイダンス/ヘルプをいただければ幸いです。

編集 1: アグリゲーター タスクの動作を次のように変更しました。

  • 複数のタスクがキューからリースされた場合、タスクをメモリに集約し、その結果をプル キュー内の別のタスクに格納し、すぐに別の「集約タスク」を起動します。
  • または、1 つのタスクがリースされた場合は、結果を保存してみてください

これにより、これまで目にしてきた競合エラーを減らすことができましたが、まだ信頼性は高くありません。ごく最近、BadRequestError: Nested transactions are not supported.スタックトレースでヒットしましたRuntimeError: Deadlock waiting for <Future fbf0db50 created by transaction_async(model.py:3345) for tasklet transaction(context.py:806) suspended generator transaction(context.py:876); pending>

私は、この変更により、集約プロセスで発生する可能性のあるすべてのオーバーラップを組み合わせて単一のインスタンスで一度に試行できるようにすることで、プロセスを最適化する必要があると考えています。結果を信頼できる方法で保存するには、まだ問題があります。

4

2 に答える 2

5

データストアのI/Oを減らす(オートバッチャーに作業を任せ、インデックス作成を無効にする)ことで、データストアの書き込みが完全になり(競合が少なくなり)、より高速になるはずです。

構成(名前が変更されたカウンター)の取得はトランザクションの外部にあり、トランザクションをループしながら同時に実行できます。

メソッドと合計プロパティがCounterに追加され、(うまくいけば)将来の変更が容易になります。

10進数をサポートするための新しいndbプロパティを作成しました(これが、0.0ではなく0.00を指定している理由であると想定しています)。

編集:

トランザクションの必要性を排除し、信頼性のためにシャーディングシステムを変更しました。

import webapp2

import copy
import decimal
import logging
import random
import string

from google.appengine.api import datastore_errors
from google.appengine.datastore import entity_pb
from google.appengine.ext import deferred
from google.appengine.ext import ndb


TEST_BATCH_SIZE = 250
TEST_NAME_LEN = 12


class DecimalProperty(ndb.Property):
    """A Property whose value is a decimal.Decimal object."""

    def _datastore_type(self, value):
      return str(value)

    def _validate(self, value):
      if not isinstance(value, decimal.Decimal):
        raise datastore_errors.BadValueError('Expected decimal.Decimal, got %r'
                                             % (value,))
      return value

    def _db_set_value(self, v, p, value):
        value = str(value)
        v.set_stringvalue(value)
        if not self._indexed:
            p.set_meaning(entity_pb.Property.TEXT)

    def _db_get_value(self, v, _):
        if not v.has_stringvalue():
            return None
        value = v.stringvalue()
        return decimal.Decimal(value)

class BatchInProgress(ndb.Model):
    """Use a scheduler to delete batches in progress after a certain time"""

    started = ndb.DateTimeProperty(auto_now=True)

    def clean_up(self):
        qry = Shard.query().filter(Shard.batch_key == self.key)
        keys = qry.fetch(keys_only=True)
        while keys:
            ndb.delete_multi(keys)
            keys = qry.fetch(keys_only=True)

def cleanup_failed_batch(batch_key):
    batch = batch_key.get()

    if batch:
        batch.clean_up()
        batch.delete()

class Shard(ndb.Model):
    """Shards for each named counter"""

    counter_key = ndb.KeyProperty(name='c')
    batch_key = ndb.KeyProperty(name='b')
    count = DecimalProperty(name='v', default=decimal.Decimal('0.00'),
                            indexed=False)

class Counter(ndb.Model):
    """Tracks the number of shards for each named counter"""

    @property
    def shards(self):
        qry = Shard.query().filter(Shard.counter_key == self.key)
        results = qry.fetch(use_cache=False, use_memcache=False)
        return filter(None, results)

    @property
    def total(self):
        count = decimal.Decimal('0.00') # Use initial value if no shards

        for shard in self.shards:
            count += shard.count

        return count

    @ndb.tasklet
    def incr_async(self, value, batch_key):
        index = batch_key.id()
        name = self.key.id() + str(index)

        shard = Shard(id=name, count=value,
                      counter_key=self.key, batch_key=batch_key)

        yield shard.put_async(use_cache=False, use_memcache=False)

    def incr(self, *args, **kwargs):
        return self.incr_async(*args, **kwargs).get_result()

@ndb.tasklet
def increment_batch(data_set):
    batch_key = yield BatchInProgress().put_async()
    deferred.defer(cleanup_failed_batch, batch_key, _countdown=3600)

    # NOTE: mapping is modified in place, hence copying
    mapping = copy.copy(data_set)

    # (1/3) filter and fire off counter gets
    #       so the futures can autobatch
    counters = {}
    ctr_futs = {}
    ctr_put_futs = []
    zero_values = set()
    for name, value in mapping.iteritems():
        if value != decimal.Decimal('0.00'):
            ctr_fut = Counter.get_by_id_async(name) # Use cache(s)
            ctr_futs[name] = ctr_fut
        else:
            # Skip zero values because...
            zero_values.add(name)
            continue

    for name in zero_values:
        del mapping[name] # Remove all zero values from the mapping
    del zero_values

    while mapping: # Repeat until all transactions succeed

        # (2/3) wait on counter gets and fire off increment transactions
        #       this way autobatchers should fill time
        incr_futs = {}
        for name, value in mapping.iteritems():
            counter = counters.get(name)
            if not counter:
                counter = counters[name] = yield ctr_futs.pop(name)
            if not counter:
                logging.info('Creating new counter %s', name)
                counter = counters[name] = Counter(id=name)
                ctr_put_futs.append(counter.put_async())
            else:
                logging.debug('Reusing counter %s', name)
            incr_fut = counter.incr_async(value, batch_key)
            incr_futs[(name, value)] = incr_fut

        # (3/3) wait on increments and handle errors
        #       by using a tuple key for variable access
        for (name, value), incr_fut in incr_futs.iteritems():
            counter = counters[name]
            try:
                yield incr_fut
            except:
                pass
            else:
                del mapping[name]

        if mapping:
            logging.warning('%i increments failed this batch.' % len(mapping))

    yield batch_key.delete_async(), ctr_put_futs

    raise ndb.Return(counters.values())

class ShardTestHandler(webapp2.RequestHandler):

    @ndb.synctasklet
    def get(self):
        if self.request.GET.get('delete'):
            ndb.delete_multi_async(Shard.query().fetch(keys_only=True))
            ndb.delete_multi_async(Counter.query().fetch(keys_only=True))
            ndb.delete_multi_async(BatchInProgress.query().fetch(keys_only=True))
        else:
            data_set_test = {}
            for _ in xrange(TEST_BATCH_SIZE):
                name = ''
                for _ in xrange(TEST_NAME_LEN):
                    name += random.choice(string.letters)
                value = decimal.Decimal('{0:.2f}'.format(random.random() * 100))
                data_set_test[name] = value
            yield increment_batch(data_set_test)
        self.response.out.write("Done!")

app = webapp2.WSGIApplication([('/shard_test/', ShardTestHandler)], debug=True)
app = ndb.toplevel(app.__call__)
于 2012-07-02T20:07:21.033 に答える
5

具体的には、「参照されたトランザクションの有効期限が切れているか、有効ではなくなっています」という BadRequestError のトピックについては、トランザクションが要求よりもはるかに早くタイムアウトになるという事実はあまり宣伝されていません。作成から 15 秒のライフを無料で取得し、その後、トランザクションが 15 秒間連続してアイドル状態になると (有効な最小ライフスパンは 30 秒です)、トランザクションは強制終了され、60 秒後に何があっても強制終了されます。これにより、多数のトランザクションを並行して実行することが困難になります。これは、CPU の競合と不公平なタスクレット スケジューリング アルゴリズムが共謀して一部のトランザクションを長時間アイドル状態に保つことができるためです。

次の ndb のトランザクション メソッドへの monkeypatch は、期限切れのトランザクションを再試行することで少し役立ちますが、最終的にはバッチ処理を調整して、競合を管理可能なレベルに減らす必要があります。

_ndb_context_transaction = ndb.Context.transaction

@ndb.tasklet
def _patched_transaction(self, callback, **ctx_options):
  if (self.in_transaction() and
      ctx_options.get('propagation') != ndb.TransactionOptions.INDEPENDENT):
    raise ndb.Return((yield _ndb_context_transaction(self, callback, **ctx_options)))

  attempts = 1
  start_time = time.time()
  me = random.getrandbits(16)
  logging.debug('Transaction started <%04x>', me)
  while True:
    try:
      result = yield _ndb_context_transaction(self, callback, **ctx_options)
    except datastore_errors.BadRequestError as e:
      if not ('expired' in str(e) and
              attempts < _MAX_BAD_REQUEST_RECOVERY_ATTEMPTS):
        raise
      logging.warning(
          'Transaction retrying <%04x> (attempt #%d, %.1f seconds) on BadRequestError: %s',
          me, attempts, time.time() - start_time, e)
      attempts += 1
    else:
      logging.debug(
          'Transaction finished <%04x> (attempt #%d, %.1f seconds)',
           me, attempts, time.time() - start_time)
      raise ndb.Return(result)

ndb.Context.transaction = _patched_transaction
于 2013-01-10T22:15:53.700 に答える