取り組む必要がある複雑なシナリオがあります。
Celery を使用してタスクを並行して実行しています。私のタスクには HTTP リクエストが含まれており、そのような目的でイベントレットとともに Celery を使用する予定です。
私のシナリオを説明しましょう:
並行して実行できる2つのタスクと、これら2つのタスクの出力で作業する必要がある3番目のタスクがあるため、Celeryグループを使用して2つのタスクを実行し、Celeryチェーンを使用して出力を3番目のタスクに渡して作業します彼らが終わるとき。
複雑になりました。3 番目のタスクは、並行して実行したい複数のタスクを生成する必要があり、すべての出力をまとめて別のタスクで処理したいと考えています。
そこで、すべての情報を処理するためのチェーンと一緒に、複数のタスクのグループを作成しました。
Celery の並行プリミティブに関する基本的な情報が不足していると思います。うまく機能する 1 つのセロリ タスクがありましたが、それを高速化する必要がありました。
これは、コードの単純化されたサンプルです。
@app.task
def task2():
return "aaaa"
@app.task
def task3():
return "bbbb"
@app.task
def task4():
work = group(...) | task5.s(...)
work()
@app.task
def task1():
tasks = [task2.s(a, b), task3.s(c, d)]
work = group(tasks) | task4.s()
return work()
これが私がこの操作を開始する方法です:
task = tasks1.apply_async(kwargs=kwargs, queue='queue1')
task.id を保存し、30 秒ごとにサーバーをプルして、次のようにして結果が得られるかどうかを確認します。
results = tasks1.AsyncResult(task_id)
if results.ready():
res = results.get()