44

状況によっては、そのタスク内からセロリ タスクを失敗させたいと考えています。私は次のことを試しました:

from celery.task import task
from celery import states

@task()
def run_simulation():
    if some_condition:
        run_simulation.update_state(state=states.FAILURE)
        return False

ただし、タスクは引き続き成功したと報告されます。

タスク sim.tasks.run_simulation[9235e3a7-c6d2-4219-bbc7-acf65c816e65] は 1.17847704887s で成功しました: False

タスクの実行中にのみ状態を変更できるようで、タスクが完了すると、セロリは状態を結果と見なすものに変更します(この質問を参照してください)。例外を発生させてタスクを失敗させることなく、タスクが失敗したことをセロリに返す方法はありますか?

4

3 に答える 3

37

例外を発生させずにタスクを失敗としてマークするには、タスクの状態を更新してから例外を発生さFAILUREIgnoreます。これは、任意の値を返すとタスクが成功として記録されるためです。例:

from celery import Celery, states
from celery.exceptions import Ignore

app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task(bind=True)
def run_simulation(self):
    if some_condition:
        # manually update the task state
        self.update_state(
            state = states.FAILURE,
            meta = 'REASON FOR FAILURE'
        )

        # ignore the task so no other state is recorded
        raise Ignore()

ただし、最善の方法は、タスクから例外を発生させることです。カスタム例外を作成して、これらの失敗を追跡できます。

class TaskFailure(Exception):
   pass

そして、あなたのタスクからこの例外を発生させます:

if some_condition:
    raise TaskFailure('Failure reason')
于 2015-10-15T08:30:42.030 に答える
2

この質問について、Ask Solem から興味深い回答を得ました。彼は、問題を解決するために「after_return」ハンドラを提案しています。これは、将来的に興味深いオプションになる可能性があります。

その間、タスクを失敗させたいときにタスクから文字列「FAILURE」を返すだけで問題を解決し、次のようにそれを確認しました。

result = AsyncResult(task_id)
if result.state == 'FAILURE' or (result.state == 'SUCCESS' and result.get() == 'FAILURE'):
    # Failure processing task 
于 2011-10-11T07:24:36.903 に答える