10
@celery.task
def my_task(my_object):
    do_something_to_my_object(my_object)


#in the code somewhere 
tasks = celery.group([my_task.s(obj) for obj in MyModel.objects.all()])
group_task = tasks.apply_async()

質問: セロリには、グループ タスクの進行状況を検出する機能がありますか? 存在するタスクの数と処理されたタスクの数を取得できますか?

4

3 に答える 3

6

シェル(ipythonのタブのオートコンプリート)をいじくり回すと、group_task(オブジェクトである)必要なものを正確に提供celery.result.ResultSetするメソッドが呼び出されることがわかりました。completed_count

http://docs.celeryproject.org/en/latest/reference/celery.result.html#celery.result.ResultSet.completed_countのドキュメントも見つかりました

于 2013-02-28T16:03:09.160 に答える
4

@daloreの回答に基づく完全な実例を次に示します。

まずtasks.py

import time
from celery import Celery, group

app = Celery('tasks', broker='pyamqp://guest@127.0.0.1//', backend='redis://localhost')

@app.task(trail=True)
def add(x, y):
    time.sleep(1)
    return x + y

@app.task(trail=True)
def group_add(l1, l2):
    return group(add.s(x1, x2) for x1, x2 in zip(l1, l2))()

Docker を使用して redis サーバーを起動しますdocker run --name my-redis -p 6379:6379 -d redis

Docker を使用して RabbitMQ を開始しますdocker run -d --hostname my-rabbit --name my-rabbit -p 5672:5672 rabbitmq:alpine

別のシェルで単一プロセスのセロリ ワーカーを開始しますcelery -A tasks worker --loglevel=info -c 1

次に、以下のテスト スクリプトを実行します。

from tasks import group_add
from tqdm import tqdm

total = 10

l1 = range(total)
l2 = range(total)
delayed_results = group_add.delay(l1, l2)
delayed_results.get()  # Wait for parent task to be ready.

results = []
for result in tqdm(delayed_results.children[0], total=total):
    results.append(result.get())
print(results)

次のように、プログレス バーが 1 秒ごとに 10% ずつ増加する様子が表示されます。

50%|#####     | 5/10 [00:05<00:05,  1.01s/it
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

最後に、redis および rabbitmq コンテナーをクリーンアップします。

docker stop my-rabbit my-redis
docker rm my-rabbit my-redis
于 2019-01-15T15:18:14.007 に答える