19

いくつかのタスクを実行するセロリ チェーンがあります。各タスクが失敗して再試行される可能性があります。簡単な例については、以下を参照してください。

from celery import task

@task(ignore_result=True)
def add(x, y, fail=True):
    try:
        if fail:
            raise Exception('Ugly exception.')
        print '%d + %d = %d' % (x, y, x+y)
    except Exception as e:
        raise add.retry(args=(x, y, False), exc=e, countdown=10)

@task(ignore_result=True)
def mul(x, y):
    print '%d * %d = %d' % (x, y, x*y)

そしてチェーン:

from celery.canvas import chain
chain(add.si(1, 2), mul.si(3, 4)).apply_async()

2 つのタスクを実行すると (何も失敗しないと仮定して)、次のように表示されます。

1 + 2 = 3
3 * 4 = 12

ただし、追加タスクが最初に失敗し、その後の再試行呼び出しが成功した場合、チェーン内の残りのタスクは実行されません。つまり、追加タスクが失敗し、チェーン内の他のすべてのタスクが実行されず、数秒後に、 add タスクが再度実行されて成功し、チェーン内の残りのタスク (この場合は mul.si(3, 4)) は実行されません。

セロリは、失敗したタスクから失敗したチェーンを継続する方法を提供しますか? そうでない場合、これを達成し、タスクが数回再試行された場合でも、チェーンのタスクが指定された順序で実行され、前のタスクが正常に実行された後にのみ実行されるようにするための最良のアプローチは何でしょうか?

注1:この問題は、次のようにすることで解決できます

add.delay(1, 2).get()
mul.delay(3, 4).get()

しかし、チェーンが失敗したタスクで機能しない理由を理解することに興味があります。

4

2 に答える 2

12

あなたはバグを見つけました:)

https://github.com/celery/celery/commit/b2b9d922fdaed5571cf685249bdc46f28acacde3で修正され 、3.0.4 の一部になります。

于 2012-07-25T17:55:10.230 に答える
0

また、失敗したタスクでチェーンが機能しない理由を理解することにも興味があります。

私はいくつかのセロリコードを掘り下げましたが、これまでに見つけたものは次のとおりです。

実装は app.builtins.py で行われます

@shared_task
def add_chain_task(app):
    from celery.canvas import chord, group, maybe_subtask
    _app = app

    class Chain(app.Task):
        app = _app
        name = 'celery.chain'
        accept_magic_kwargs = False

        def prepare_steps(self, args, tasks):
            steps = deque(tasks)
            next_step = prev_task = prev_res = None
            tasks, results = [], []
            i = 0
            while steps:
                # First task get partial args from chain.
                task = maybe_subtask(steps.popleft())
                task = task.clone() if i else task.clone(args)
                i += 1
                tid = task.options.get('task_id')
                if tid is None:
                    tid = task.options['task_id'] = uuid()
                res = task.type.AsyncResult(tid)

                # automatically upgrade group(..) | s to chord(group, s)
                if isinstance(task, group):
                    try:
                        next_step = steps.popleft()
                    except IndexError:
                        next_step = None
                if next_step is not None:
                    task = chord(task, body=next_step, task_id=tid)
                if prev_task:
                    # link previous task to this task.
                    prev_task.link(task)
                    # set the results parent attribute.
                    res.parent = prev_res

                results.append(res)
                tasks.append(task)
                prev_task, prev_res = task, res

            return tasks, results

        def apply_async(self, args=(), kwargs={}, group_id=None, chord=None,
                task_id=None, **options):
            if self.app.conf.CELERY_ALWAYS_EAGER:
                return self.apply(args, kwargs, **options)
            options.pop('publisher', None)
            tasks, results = self.prepare_steps(args, kwargs['tasks'])
            result = results[-1]
            if group_id:
                tasks[-1].set(group_id=group_id)
            if chord:
                tasks[-1].set(chord=chord)
            if task_id:
                tasks[-1].set(task_id=task_id)
                result = tasks[-1].type.AsyncResult(task_id)
            tasks[0].apply_async()
            return result

        def apply(self, args=(), kwargs={}, **options):
            tasks = [maybe_subtask(task).clone() for task in kwargs['tasks']]
            res = prev = None
            for task in tasks:
                res = task.apply((prev.get(), ) if prev else ())
                res.parent, prev = prev, res
            return res
    return Chain

prepare_steps prev_task最後に次のタスクにリンクされていることがわかります。prev_task が失敗すると、次のタスクは呼び出されません。

前のタスクから次のタスクに link_error を追加してテストしています:

if prev_task:
    # link and link_error previous task to this task.
    prev_task.link(task)
    prev_task.link_error(task)
    # set the results parent attribute.
    res.parent = prev_res

ただし、次のタスクは両方のケースを処理する必要があります (おそらく、不変に構成されている場合を除きます。たとえば、それ以上の引数を受け入れません)。

次のような構文を許可することで、チェーンがそれをサポートできると思います。

c = chain(t1, (t2, t1e), (t3, t2e))

つまり:

t1 linkt2link_errort1e

t2 linkt3link_errort2e

于 2012-07-23T04:37:40.033 に答える