私はセロリでかなり奇妙な問題に直面しました:
一連のタスクがあり、そのうちの 1 つが例外を発生させ、何度か再試行します
chain = (err.si(1) | err.si(2))
result = chain.apply_async()
result.state
result.get()
タスクのコードは次のとおりです。
@celery.task(base=MyTask)
def err(x):
try:
if x < 3:
raise Exception
else:
return x+1
except Exception as exp:
print "retrying"
raise err.retry(args=[x],exc=exp,countdown=5,max_retries=3)
問題は、チェーン内のタスクが例外を発生させても、result.state が「PENDING」のままで、.get() がフリーズすることです。
最大再試行値に達した場合に備えて、タスクを失敗させようとしました:
class MyTask(celery.Task):
abstract = True
def after_return(self, status, retval, task_id, args, kwargs, einfo):
if self.max_retries == self.request.retries:
self.state = states.FAILURE
ただし、個別に実行されたタスクは FAILED としてマークされますが、チェーンで実行すると同じ結果 - PENDING & Freezed get が得られます。
タスクのいずれかが失敗し、結果の .get がタスクからスローされた例外を生成する必要があると、チェーンが失敗することを期待していました。
_ UPDATE _ apply_async で ALWAYS_EAGER=True を指定して取得したスタック トレース
result = chain.apply_async()
Exception
Traceback (most recent call last)
<ipython-input-4-81202b369b5f> in <module>()
----> 1 result = chain.apply_async()
lib/python2.7/site-packages/celery/canvas.pyc in apply_async(self, args, kwargs, **options)
147 # For callbacks: extra args are prepended to the stored args.
148 args, kwargs, options = self._merge(args, kwargs, options)
--> 149 return self.type.apply_async(args, kwargs, **options)
150
151 def append_to_list_option(self, key, value):
/lib/python2.7/site-packages/celery/app/builtins.pyc in apply_async(self, args, kwargs, group_id, chord, task_id, **options)
232 task_id=None, **options):
233 if self.app.conf.CELERY_ALWAYS_EAGER:
--> 234 return self.apply(args, kwargs, **options)
235 options.pop('publisher', None)
236 tasks, results = self.prepare_steps(args, kwargs['tasks'])
lib/python2.7/site-packages/celery/app/builtins.pyc in apply(self, args, kwargs, subtask, **options)
249 last, fargs = None, args # fargs passed to first task only
250 for task in kwargs['tasks']:
--> 251 res = subtask(task).clone(fargs).apply(last and (last.get(), ))
252 res.parent, last, fargs = last, res, None
253 return last
lib/python2.7/site-packages/celery/result.pyc in get(self, timeout, propagate, **kwargs)
677 elif self.state in states.PROPAGATE_STATES:
678 if propagate:
--> 679 raise self.result
680 return self.result
681 wait = get
Exception: