同様の質問がありました。ネット上のほとんどの例は時代遅れであり、ドキュメントはあまり役に立ちませんでしたが、ドキュメントにはソースへのリンクがあり、それを読んで私は役に立ちました。私の目的は、並列タスクをグループに編成することでした。グループは順番に実行する必要があります。そこで、タスクを個別に開始して割り当てる前に、タスクIDを生成することにしました。Celery4.3.0を使用しています
これが簡単な例です。
まず、実行をシーケンシャルにし、特定のグループの状態をチェックできるようにするためのダミータスクが必要でした。これはコールバックとして使用されるため、グループ内の他のすべてのタスクの後でのみ完了します。
@celery.task(bind=True, name="app.tasks.dummy_task")
def dummy_task( self, results=None, *args, **kwargs ):
return results
ここでの私のコメントは、IDの割り当て方法を説明しています。
from celery.utils import uuid
from celery import group, chord, chain
# Generating task ids,
# which can be saved to a db, sent to the client and so on
#
# This is done before executing any tasks
task_id_1 = uuid()
task_id_2 = uuid()
chord_callback_id_1 = uuid()
chord_callback_id_2 = uuid()
workflow_id = None
# Generating goups, using signatures
# the group may contain any number of tasks
group_1 = group(
[
celery.signature(
'app.tasks.real_task',
args=(),
kwargs = { 'email': some_email, 'data':some_data },
options = ( {'task_id': task_id_1 } )
)
]
)
group_2 = group(
[
celery.signature(
'app.tasks.real_task',
args=(),
kwargs = { 'email': some_email, 'data':some_data },
options = ( {'task_id': task_id_2 } )
)
]
)
# Creating callback task which will simply rely the result
# Using the task id, which has been generated before
#
# The dummy task start after all tasks in this group are completed
# This way we know that the group is completed
chord_callback = celery.signature(
'app.tasks.dummy_task',
options=( {'task_id': chord_callback_id_1 } )
)
chord_callback_2 = celery.signature(
'app.tasks.dummy_task',
options=( {'task_id': chord_callback_id_2 } )
)
# we can monitor each step status
# by its chord callback id
# the id of the chord callback
step1 = chord( group_1, body=chord_callback )
# the id of the chord callback
step2 = chord( group_2, body=chord_callback_2 )
# start the workflow execution
# the steps will execute sequentially
workflow = chain( step1, step2 )()
# the id of the last cord callback
workflow_id = workflow.id
# return any ids you need
print( workflow_id )
これにより、アプリ内のタスクのステータスを確認できます。
# This is a simplified example
# some code is omitted
from celery.result import AsyncResult
def task_status( task_id=None ):
# PENDING
# RECEIVED
# STARTED
# SUCCESS
# FAILURE
# REVOKED
# RETRY
task = AsyncResult(task_id)
response = {
'state': task.state,
}
return jsonify(response), 200