10

各タスクのステータスを照会して、タスク チェーンの進行状況を取得しようとしています。しかし、チェーンをIDで取得すると、動作が異なるオブジェクトが得られます。

tasks.py で

from celery import Celery

celery = Celery('tasks')
celery.config_from_object('celeryconfig')

def unpack_chain(nodes): 
    while nodes.parent:
        yield nodes.parent
        nodes = nodes.parent
    yield nodes

@celery.task
def add(num, num2):
    return num + num2

ipython からクエリを実行すると...

In [43]: from celery import chain
In [44]: from tasks import celery, add, unpack_chain
In [45]: c = chain(add.s(3,3), add.s(10).set(countdown=100))
In [46]: m = c.apply_async()
In [47]: a = celery.AsyncResult(m.id)
In [48]: a == m
Out[48]: True
In [49]: a.id == m.id
Out[49]: True
In [50]: [t.status for t in list(unpack_chain(a))]
Out[50]: ['PENDING']
In [51]: [t.status for t in list(unpack_chain(m))]
Out[51]: ['PENDING', 'SUCCESS']

Redis で Python 2.7.3 と Celery 3.0.19 を使用します。

50 & 51でわかるように、 によって返される値celery.AsyncResultは元のチェーンとは異なります。

チェーン ID で元のチェーン タスク リストを取得するにはどうすればよいですか?

4

1 に答える 1

10

@Hernantz が言ったように、タスク ID だけから親チェーンを回復することはできません。ブローカーとして使用するものに応じて、可能な場合と不可能な場合があるキューを反復処理する必要があります。

しかし、ルックアップを行うための最後のタスク ID がある場合は、チェーンが作成されます。すべてのタスク ID を保存し、それらのステータスを調べる必要があるときにチェーンを再構築するだけで済みます。次の機能を使用できます。

def store(node):
    id_chain = []
    while node.parent:
      id_chain.append(node.id)
      node = node.parent
    id_chain.append(node.id)
    return id_chain

def restore(id_chain):
    id_chain.reverse()
    last_result = None
    for tid in id_chain:
        result = celery.AsyncResult(tid)
        result.parent = last_result
        last_result = result
    return last_result

チェーンから AsyncResult を最初に取得するときに store を呼び出します。AsyncResultその上で restore を呼び出すと、 chain のように のリンクされたリストが得られます。

于 2014-05-28T09:54:06.060 に答える