6

私はセロリでかなり奇妙な問題に直面しました:

一連のタスクがあり、そのうちの 1 つが例外を発生させ、何度か再試行します

chain = (err.si(1) | err.si(2))
result = chain.apply_async()
result.state
result.get()

タスクのコードは次のとおりです。

@celery.task(base=MyTask)
def err(x):
try:
    if x < 3:
        raise Exception
    else:
        return x+1

except Exception as exp:
    print "retrying"
    raise err.retry(args=[x],exc=exp,countdown=5,max_retries=3)

問題は、チェーン内のタスクが例外を発生させても、result.state が「PENDING」のままで、.get() がフリーズすることです。

最大再試行値に達した場合に備えて、タスクを失敗させようとしました:

class MyTask(celery.Task):
abstract = True
def after_return(self, status, retval, task_id, args, kwargs, einfo):
    if self.max_retries == self.request.retries:
        self.state = states.FAILURE

ただし、個別に実行されたタスクは FAILED としてマークされますが、チェーンで実行すると同じ結果 - PENDING & Freezed get が得られます。

タスクのいずれかが失敗し、結果の .get がタスクからスローされた例外を生成する必要があると、チェーンが失敗することを期待していました。

_ UPDATE _ apply_async で ALWAYS_EAGER=True を指定して取得したスタック トレース

result = chain.apply_async()

Exception                                 
Traceback (most recent call last)
<ipython-input-4-81202b369b5f> in <module>()
----> 1 result = chain.apply_async()

lib/python2.7/site-packages/celery/canvas.pyc in apply_async(self, args, kwargs,     **options)
    147         # For callbacks: extra args are prepended to the stored args.
    148         args, kwargs, options = self._merge(args, kwargs, options)
--> 149         return self.type.apply_async(args, kwargs, **options)
    150 
    151     def append_to_list_option(self, key, value):

/lib/python2.7/site-packages/celery/app/builtins.pyc in apply_async(self, args, kwargs, group_id, chord, task_id, **options)
    232                 task_id=None, **options):
    233             if self.app.conf.CELERY_ALWAYS_EAGER:
--> 234                 return self.apply(args, kwargs, **options)
    235             options.pop('publisher', None)
    236             tasks, results = self.prepare_steps(args, kwargs['tasks'])

lib/python2.7/site-packages/celery/app/builtins.pyc in apply(self, args, kwargs, subtask, **options)
    249             last, fargs = None, args  # fargs passed to first task only
    250             for task in kwargs['tasks']:
--> 251                 res = subtask(task).clone(fargs).apply(last and (last.get(), ))
    252                 res.parent, last, fargs = last, res, None
    253             return last

lib/python2.7/site-packages/celery/result.pyc in get(self, timeout, propagate, **kwargs)
    677         elif self.state in states.PROPAGATE_STATES:
    678             if propagate:
--> 679                 raise self.result
    680             return self.result
    681     wait = get

Exception: 
4

2 に答える 2

14

チェーンがある場合:

>>> c = a.s() | b.s() | c.s()
>>> res = c()
>>> res.get()

チェーンを呼び出すと、チェーン内のすべてのタスクに対して一意の ID が生成され、メッセージが送信され、チェーン内の最後の結果が返されます。

そのres.get()ため、チェーンの最後のタスクの結果を取得しようとするのは簡単です。

またparent、チェーンの進行状況を取得するためにトラバースできる属性で結果を装飾します。

>>> res                # result of c.s()
>>> res.parent         # result of b.s()
>>> res.parent.parent  # result of a.s()

途中でエラーをチェックしたい場合は、次のようにします。

def nodes(node):
    while node.parent:
        yield node
        node = node.parent
    yield node


values = [node.get(timeout=1) for node in reversed(list(nodes(res)))]
value = values[-1]
于 2012-10-01T11:48:59.713 に答える
0

本当はここで使うべきではないと思いますraise

ドキュメンテーションですべきではないと書かれている場合、例外をスローしてerr.retryいますraise err.retry

于 2012-10-01T10:39:09.203 に答える