セロリ労働者のリアルタイム監視に関するセロリチュートリアルによると、労働者によって生成されたイベントをプログラムでキャプチャし、それに応じてアクションを実行することもできます。
私の質問は、Celery-Djangoアプリケーションで、この例のモニターをどのように統合できるかということです。
編集:チュートリアルのコード例は次のようになります。
from celery import Celery
def my_monitor(app):
state = app.events.State()
def announce_failed_tasks(event):
state.event(event)
task_id = event['uuid']
print('TASK FAILED: %s[%s] %s' % (
event['name'], task_id, state[task_id].info(), ))
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-failed': announce_failed_tasks,
'worker-heartbeat': announce_dead_workers,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
celery = Celery(broker='amqp://guest@localhost//')
my_monitor(celery)
したがって、ワーカーによって送信されたtask_failedイベントをキャプチャし、チュートリアルが示すようにそのtask_idを取得して、アプリケーション用に構成された結果バックエンドからこのタスクの結果を取得し、さらに処理したいと思います。私の問題は、アプリケーションの入手方法がはっきりしないことです。django-celeryプロジェクトでは、Celeryライブラリのインスタンス化が透過的ではないためです。
また、ワーカーがタスクの実行を終了したときに結果を処理する方法について、他のアイデアも受け入れています。