21

タスクが失われたかどうかを判断して再試行する方法はありますか?


失われた理由は、ディスパッチャのバグまたはワーカースレッドのクラッシュである可能性があると思います。

それらを再試行することを計画していましたが、どのタスクを廃止する必要があるかを判断する方法がわかりませんか?

そして、このプロセスを自動的に行う方法は?新しいタスクを作成する独自のカスタムスケジューラを使用できますか?

編集:ドキュメントから、RabbitMQがタスクを失うことはないことがわかりましたが、タスクの実行中にワーカースレッドがクラッシュするとどうなりますか?

4

1 に答える 1

35

必要なのは設定することです

CELERY_ACKS_LATE = 真

遅延確認とは、タスクの実行直前ではなく、実行後にタスク メッセージが確認されることを意味します。これはデフォルトの動作です。このようにして、ワーカーがクラッシュした場合でも、ウサギの MQ にはメッセージが表示されます。

明らかに、完全なクラッシュ (ウサギ + ワーカー) と同時に、タスクの開始と終了のログを実装する場合を除いて、タスクを回復する方法はありません。個人的には、タスクが開始されるたびに mongodb に 1 行を書き込み、タスクが終了するたびに (結果から独立して) 別の行を書き込みます。このようにして、mongo ログを分析することで、どのタスクが中断されたかを知ることができます。

セロリの基本タスク クラスのメソッド__call__とメソッドをオーバーライドすることで、簡単に実行できます。after_return

以下は、taskLogger クラスをコンテキスト マネージャーとして使用するコードの一部です (エントリ ポイントとエグジット ポイントを使用)。taskLogger クラスは、タスク情報を含む行を mongodb インスタンスに書き込むだけです。

def __call__(self, *args, **kwargs):
    """In celery task this function call the run method, here you can
    set some environment variable before the run of the task"""

    #Inizialize context managers    

    self.taskLogger = TaskLogger(args, kwargs)
    self.taskLogger.__enter__()

    return self.run(*args, **kwargs)

def after_return(self, status, retval, task_id, args, kwargs, einfo):
    #exit point for context managers
    self.taskLogger.__exit__(status, retval, task_id, args, kwargs, einfo)

これが役立つことを願っています

于 2011-03-17T13:18:37.133 に答える