1

csvファイルにエクスポートしたい特定のタイプのエンティティが何百万もあります。次のコードは、BLOBを開いたままにし、次のバッチをタスクキューに延期しながら、1000のバッチでエンティティをBLOBに書き込みます。フェッチするエンティティがなくなると、blobがファイナライズされます。これは私のローカルテストのほとんどで機能するようですが、知りたかったのは次のとおりです。

本番データで実行する前に落とし穴やコーナーケースを見逃していて、データストアの読み取りに$sが発生した場合。

期限を超えた場合、またはバッチがblobに書き込まれている間にメモリが不足した場合、このコードはデフォルトで現在のバッチの開始になり、タスクを再度実行するため、多くの重複が発生する可能性があります。それを修正するための提案はありますか?

def entities_to_csv(entity_type,blob_file_name='',cursor='',batch_size=1000):
  more = True
  next_curs = None
  q = entity_type.query()
  results,next_curs,more = q.fetch_page(batch_size,start_cursor=Cursor.from_websafe_string(cursor))
  if results:
    try:
      if not blob_file_name:
        blob_file_name = files.blobstore.create(mime_type='text/csv',_blob_uploaded_filename='%s.csv' % entity_type.__name__)

      rows = [e.to_dict() for e in results]
      with files.open(blob_file_name, 'a') as f:
        writer = csv.DictWriter(f,restval='',extrasaction='ignore',fieldnames=results[0].keys())
        writer.writerows(rows)

      if more:
        deferred.defer(entity_type,blob_file_name,next_curs.to_websafe_string())
      else:
        files.finalize(blob_file_name)
    except DeadlineExceededError:
      deferred.defer(entity_type,blob_file_name,cursor)

コードの後半では、次のようになります。

deferred.defer(entities_to_csv,Song)
4

2 に答える 2

1

現在のソリューションの問題は、ブロブストアへのプリフォームへの書き込みごとにメモリが増加することです。ブロブストアは不変であり、すべてのデータをメモリから一度に書き込みます。

メモリ内のすべてのレコードを保持できるバックエンドでジョブを実行する必要があります。アプリケーションでバックエンドを定義し、 defer を呼び出す必要があります_target='<backend name>'

于 2012-07-11T14:44:09.657 に答える
0

この Google I/O ビデオをチェックしてください。ビデオの 23:15 あたりから、MapReduce を使用して何をしたいのかを説明しています。欲しいコードは27:19にあります

https://developers.google.com/events/io/sessions/gooio2012/307/

于 2012-07-11T15:14:16.860 に答える