また、失敗したタスクでチェーンが機能しない理由を理解することにも興味があります。
私はいくつかのセロリコードを掘り下げましたが、これまでに見つけたものは次のとおりです。
実装は app.builtins.py で行われます
@shared_task
def add_chain_task(app):
from celery.canvas import chord, group, maybe_subtask
_app = app
class Chain(app.Task):
app = _app
name = 'celery.chain'
accept_magic_kwargs = False
def prepare_steps(self, args, tasks):
steps = deque(tasks)
next_step = prev_task = prev_res = None
tasks, results = [], []
i = 0
while steps:
# First task get partial args from chain.
task = maybe_subtask(steps.popleft())
task = task.clone() if i else task.clone(args)
i += 1
tid = task.options.get('task_id')
if tid is None:
tid = task.options['task_id'] = uuid()
res = task.type.AsyncResult(tid)
# automatically upgrade group(..) | s to chord(group, s)
if isinstance(task, group):
try:
next_step = steps.popleft()
except IndexError:
next_step = None
if next_step is not None:
task = chord(task, body=next_step, task_id=tid)
if prev_task:
# link previous task to this task.
prev_task.link(task)
# set the results parent attribute.
res.parent = prev_res
results.append(res)
tasks.append(task)
prev_task, prev_res = task, res
return tasks, results
def apply_async(self, args=(), kwargs={}, group_id=None, chord=None,
task_id=None, **options):
if self.app.conf.CELERY_ALWAYS_EAGER:
return self.apply(args, kwargs, **options)
options.pop('publisher', None)
tasks, results = self.prepare_steps(args, kwargs['tasks'])
result = results[-1]
if group_id:
tasks[-1].set(group_id=group_id)
if chord:
tasks[-1].set(chord=chord)
if task_id:
tasks[-1].set(task_id=task_id)
result = tasks[-1].type.AsyncResult(task_id)
tasks[0].apply_async()
return result
def apply(self, args=(), kwargs={}, **options):
tasks = [maybe_subtask(task).clone() for task in kwargs['tasks']]
res = prev = None
for task in tasks:
res = task.apply((prev.get(), ) if prev else ())
res.parent, prev = prev, res
return res
return Chain
prepare_steps
prev_task
最後に次のタスクにリンクされていることがわかります。prev_task が失敗すると、次のタスクは呼び出されません。
前のタスクから次のタスクに link_error を追加してテストしています:
if prev_task:
# link and link_error previous task to this task.
prev_task.link(task)
prev_task.link_error(task)
# set the results parent attribute.
res.parent = prev_res
ただし、次のタスクは両方のケースを処理する必要があります (おそらく、不変に構成されている場合を除きます。たとえば、それ以上の引数を受け入れません)。
次のような構文を許可することで、チェーンがそれをサポートできると思います。
c = chain(t1, (t2, t1e), (t3, t2e))
つまり:
t1
link
へt2
とlink_error
へt1e
t2
link
へt3
とlink_error
へt2e