0

API プロバイダーから大量のデータを読み取っています。応答を受け取ったら、データをスキャンして再パッケージ化し、App Engine データストアに入れる必要があります。特定の大きなアカウントには、最大 50,000 のエントリが含まれます。

API からエントリを取得するたびに、500 エントリを一時テーブルにバッチとして保存し、処理タスクをキューに送信します。1 つのキュー内であまりにも多くのタスクが詰まった場合に備えて、合計 6 つのキューを使用します。

count = 0 
worker_number = 6
for folder, property in entries:
                    data[count] = {
                        # repackaging data here
                    }

                    count = (count + 1) % 500

                    if count == 0:
                        cache = ClientCache(parent=user_key, data=json.dumps(data))
                        cache.put()
                        params = {
                            'access_token': access_token,
                            'client_key': client.key.urlsafe(),
                            'user_key': user_key.urlsafe(),
                            'cache_key': cache.key.urlsafe(),
                        }
                        taskqueue.add(
                            url=task_url,
                            params=params,
                            target='dbworker',
                            queue_name='worker%d' % worker_number)
                        worker_number = (worker_number + 1) % 6

そして、task_url は次のようになります。

logging.info('--------------------- Process File ---------------------')
        user_key = ndb.Key(urlsafe=self.request.get('user_key'))
        client_key = ndb.Key(urlsafe=self.request.get('client_key'))
        cache_key = ndb.Key(urlsafe=self.request.get('cache_key'))

        cache = cache_key.get()
        data = json.loads(cache.data)
        for property in data.values():
            logging.info(property)
            try:
                key_name = '%s%s' % (property['key1'], property['key2'])
                metadata = Metadata.get_or_insert(
                    key_name,
                    parent=user_key,
                    client_key=client_key,
                    # ... other info
                )
                metadata.put()
            except StandardError, e:
                logging.error(e.message)

すべてのタスクはバックエンドで実行されています。

このような構造で、うまく機能しています。まあ...ほとんどの場合。しかし、時々このエラーが発生します:

2013-09-19 15:10:07.788
suspended generator transaction(context.py:938) raised TransactionFailedError(The transaction could not be committed. Please try again.)
W 2013-09-19 15:10:07.788
suspended generator internal_tasklet(model.py:3321) raised TransactionFailedError(The transaction could not be committed. Please try again.)
E 2013-09-19 15:10:07.789
The transaction could not be committed. Please try again.

データストアへの書き込みが頻繁すぎることが問題のようですか? どうすればペースのバランスを取り、ワーカーをスムーズに走らせることができるかを知りたいです... また、パフォーマンスをさらに向上させる方法はありますか? 私のキュー構成は次のようなものです:

- name: worker0
  rate: 120/s
  bucket_size: 100
  retry_parameters:
    task_retry_limit: 3
4

1 に答える 1

2

一度に 1 つのエンティティを作成しています。

コードを変更してバッチで書き込むとndb.put_multi 、各トランザクションの往復時間が短縮されます。

そして、毎回レコードを上書きしているので、なぜ get_or_insert を使用しているのでしょうか。書くだけでもいいです。どちらも作業負担を大幅に軽減します

于 2013-09-19T08:13:44.633 に答える