9

python 2.6、rabbitmqバックエンド、およびdjangoでcelery2.4.1を使用しています。ワーカーがシャットダウンした場合に、タスクを適切にクリーンアップできるようにしたいと思います。私の知る限り、タスクデストラクタを提供できないので、worker_shutdownシグナルにフックしてみました。

注:AbortableTaskはデータベースバックエンドでのみ機能するため、使用できません。

from celery.signals import worker_shutdown

@task
def mytask(*args)

  obj = DoStuff()

  def shutdown_hook(*args):
     print "Worker shutting down"
     # cleanup nicely
     obj.stop()

  worker_shutdown.connect(shutdown_hook)

  # blocking call that monitors a network connection
  obj.stuff()

ただし、シャットダウンフックが呼び出されることはありません。ワーカーをCtrl-Cで実行してもタスクは強制終了されないため、シェルから手動で強制終了する必要があります。

それで、これが適切な方法ではない場合、タスクを正常にシャットダウンできるようにするにはどうすればよいですか?

4

1 に答える 1

11

worker_shutdownMainProcess子プールワーカーではなく、によってのみ送信されます。すべてのworker_*信号except for worker_process_initは、を参照してくださいMainProcess

ただし、シャットダウンフックが呼び出されることはありません。ワーカーをCtrl-Cで実行してもタスクは強制終了されないため、シェルから手動で強制終了する必要があります。

ワーカーは、通常の(ウォーム)シャットダウン時にタスクを終了することはありません。タスクが完了するまでに数日かかる場合でも、ワーカーはタスクが完了するまでシャットダウンを完了しません。--soft-time-limit、またはを設定--time-limitして、タスクを終了しても問題がないことをインスタンスに通知できます。

したがって、あらゆる種類のプロセスクリーンアッププロセスを追加するには、最初にタスクが実際に完了できることを確認する必要があります。それが起こる前にクリーンアップが呼び出されないので。

プールワーカープロセスにクリーンアップステップを追加するには、次のようなものを使用できます。

from celery import platforms
from celery.signals import worker_process_init

def cleanup_after_tasks(signum, frame):
    # reentrant code here (see http://docs.python.org/library/signal.html)

def install_pool_process_sighandlers(**kwargs):
    platforms.signals["TERM"] = cleanup_after_tasks
    platforms.signals["INT"] = cleanup_after_tasks

worker_process_init.connect(install_pool_process_sighandlers)
于 2011-11-22T16:47:36.793 に答える