すべての情報はメッセージに含まれています。
メッセージは、世界の反対側で転送中の場合もあれば、中間プロセッサによって消費される場合もあります。このため、送信後にメッセージを変更することはできません。
http://docs.celeryproject.org/en/latest/userguide/tasks.html#stateを参照してください
私の目標は、コードで指定したのと同じ順序でチェーンが実行されるようにすることです。また、チェーンでタスクが失敗した場合、次のタスクは実行されません。
注文はメッセージの一部として送信され、いずれかのタスクが失敗した場合は続行されません。
ここで、実行時にタスクを追加できるようにしたい場合は、情報をデータベースに保存し、タスク自体にそれをチェックさせて新しいタスクを呼び出すことができます。ただし、これを行うにはいくつかの課題があります。
1) チェーンの最初のタスクは、成功すると次のタスクを呼び出し、その後、次のタスクは次のタスクを呼び出します。
2) このプロセスにタスクを追加すると、最初のタスクが既に実行されている場合はどうなりますか? または2番目、または3番目?
ご想像のとおり、これを機能させるにはかなりの同期が必要になります。
簡単な解決策は、1 つのタスクが完了するのを待ってからコールバックを適用するタスクを作成することだと思います。
from celery import subtask
from celery.result import from_serializable
@app.task(bind=True)
def after_task(self, result, callback, errback=None):
result = from_serializable(result)
if not result.ready():
raise self.retry(countdown=1)
if task.successful():
subtask(callback).delay(result.get())
else:
if errback:
subtask(errback)()
def add_to_chain(result, callback, errback=None):
callback = callback.clone() # do not modify caller
new_result = callback.freeze() # sets id for callback, returns AsyncResult
new_result.parent = result
after_task.delay(result.serializable(), callback, errback)
return new_result
次に、次のように使用できます。
from tasks import t1, t2, t3
res = (t1.s(123) | t2.s() | t3.s())()
res = add_to_chain(t2.s())
ノート:
bind=True
は今後の 3.1 バージョンで新しく追加されました。古いバージョンでは、self 引数を削除してcurrent_task.retry
(get this from celery import current_task
) を使用する必要があります。
Signature.freeze
も 3.1 で新しくなりました。古いバージョンで同じことを行うには、次を使用できます。
from celery import uuid
def freeze(sig, _id=None):
opts = sig.options
try:
tid = opts['task_id']
except KeyError:
tid = opts['task_id'] = _id or uuid()
return sig.AsyncResult(tid)