2

他の誰かがこの問題を抱えているかどうかはわかりませんが、appengine 開発サーバーでタスクをチェーンするためにカーソルを使用すると、「クエリ オフセットが大きすぎます」という例外が発生します (ライブで発生するかどうかはわかりません)。

1 回のクエリで 4000 件以上のレコードが処理された後にカーソルを要求すると、エラーが発生します。

オフセットがカーソルと関係があることを知りませんでした。おそらく、アプリ エンジン用の sdk の癖にすぎません。

修正するには、タスクが延期される前に許容される時間を短縮する (一度に処理されるレコードが少なくなるようにする) か、経過時間を確認するときに、処理されたレコードの数がまだ範囲内にあることを確認することもできます。たとえば、time.time() > end_time またはcount == 2000の場合。カウントをリセットし、タスクを延期します。2000 は任意の数値です。上限をどうするかはわかりません。

編集:

上記の変更を行った後、実行が終了することはありません。with_cursor(cursor) コードが呼び出されていますが、毎回最初から開始されているようです。明らかな何かが欠けていますか?

例外の原因となるコードは次のとおりです。

テーブル「Transact」には 4800 行あります。time.time() > end_time が true のときに、transacts.cursor() が呼び出されると、エラーが発生します。カーソルが要求された時点で 4510 レコードが処理されたため、エラーが発生したようです (開発サーバーでは、他の場所ではテストされていません)。

def some_task(trans):
  tts = db.get(trans)
  for t in tts:
    #logging.info('in some_task')
    pass


def test_cursor(request):
  ret = test_cursor_task()


def test_cursor_task(cursor = None):
  startDate = datetime.datetime(2010,7,30)
  endDate = datetime.datetime(2010,8,30)
  end_time = time.time() + 20.0   
  transacts = Transact.all().filter('transactionDate >', startDate).filter('transactionDate <=',endDate)

  count =0
  if cursor:
      transacts.with_cursor(cursor)
  trans =[]
  logging.info('queue_trans')
  for tran in transacts:
    count+=1
    #trans.append(str(tran))   
    trans.append(str(tran.key()))   

    if len(trans)==20:
            deferred.defer(some_task, trans,  _countdown = 500)                
            trans =[]            
    if time.time() > end_time:
        logging.info(count)            
        if len(trans)>0:                
           deferred.defer(some_task, trans, _countdown = 500)
           trans =[]
        logging.info('time limit exceeded setting next call to queue')
        cursor = transacts.cursor()
        deferred.defer(test_cursor_task, cursor)
        logging.info('returning false')
        return False


    return True


  return HttpResponse('')

これが誰かに役立つことを願っています。

ありがとうバート

4

1 に答える 1

1

反復機能を使用せずにこれをもう一度試してください。

#...
CHUNK = 500
objs = transacts.fetch(CHUNK)

for tran in objs:
    do_your_stuff

if len(objs) == CHUNK:
    deferred.defer(my_task_again, cursor=str(transacts.cursor()))

これは私にとってはうまくいきます。

于 2012-02-27T21:03:29.127 に答える