@daloreの回答に基づく完全な実例を次に示します。
まずtasks.py
。
import time
from celery import Celery, group
app = Celery('tasks', broker='pyamqp://guest@127.0.0.1//', backend='redis://localhost')
@app.task(trail=True)
def add(x, y):
time.sleep(1)
return x + y
@app.task(trail=True)
def group_add(l1, l2):
return group(add.s(x1, x2) for x1, x2 in zip(l1, l2))()
Docker を使用して redis サーバーを起動しますdocker run --name my-redis -p 6379:6379 -d redis
。
Docker を使用して RabbitMQ を開始しますdocker run -d --hostname my-rabbit --name my-rabbit -p 5672:5672 rabbitmq:alpine
。
別のシェルで単一プロセスのセロリ ワーカーを開始しますcelery -A tasks worker --loglevel=info -c 1
。
次に、以下のテスト スクリプトを実行します。
from tasks import group_add
from tqdm import tqdm
total = 10
l1 = range(total)
l2 = range(total)
delayed_results = group_add.delay(l1, l2)
delayed_results.get() # Wait for parent task to be ready.
results = []
for result in tqdm(delayed_results.children[0], total=total):
results.append(result.get())
print(results)
次のように、プログレス バーが 1 秒ごとに 10% ずつ増加する様子が表示されます。
50%|##### | 5/10 [00:05<00:05, 1.01s/it
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
最後に、redis および rabbitmq コンテナーをクリーンアップします。
docker stop my-rabbit my-redis
docker rm my-rabbit my-redis