2

「eta」引数を使用してセロリでタスクをスケジュールするリマインダー タイプのアプリがあります。リマインダー オブジェクトのパラメーターが変更された場合 (リマインダーの時刻など)、以前に送信されたタスクを取り消し、新しいタスクをキューに入れます。

celeryd の再起動時に取り消されたタスクを追跡する良い方法があるかどうか疑問に思っていました。その場で celeryd プロセスをスケールアップ/ダウンできるようにしたいのですが、revoke コマンドが送信された後に開始された celeryd プロセスは引き続きそのタスクを実行するようです。

これを行う 1 つの方法は、取り消されたタスク ID のリストを保持することですが、この方法ではリストが勝手に大きくなります。このリストを整理するには、タスクがもう RabbitMQ キューにないことを保証する必要がありますが、これは不可能のようです。

また、各 celeryd ワーカーに共有 --statedb ファイルを使用してみましたが、stateb ファイルはワーカーの終了時にのみ更新されるため、達成したいことには適していないようです。

前もって感謝します!

4

3 に答える 3

2

興味深い問題ですが、ブロードキャストコマンドを使用すると簡単に解決できるはずです。新しいワーカーが起動したときに、他のすべてのワーカーに、取り消されたタスクを新しいワーカーにダンプするように要求した場合。2つの新しいリモートコントロールコマンドを追加すると、、を使用して新しいコマンドを簡単に追加できます@Panel.register

モジュールcontrol.py:

from celery.worker import state
from celery.worker.control import Panel

@Panel.register
def bulk_revoke(panel, ids):
    state.revoked.update(ids)

@Panel.register
def broadcast_revokes(panel, destination):
    panel.app.control.broadcast("bulk_revoke", arguments={
         "ids": list(state.revoked)},
         destination=destination)

CELERY_IMPORTSに追加します。

CELERY_IMPORTS = ("control", )

現在欠落している唯一の問題はbroadcast_revokes、起動時に新しいワーカーがトリガーされるように接続することです。私はあなたがworker_ready これのために信号を使うことができると思います:

from celery import current_app as celery
from celery.signals import worker_ready

def request_revokes_at_startup(sender=None, **kwargs):
    celery.control.broadcast("broadcast_revokes",
                             destination=sender.hostname)
于 2012-04-09T15:04:53.813 に答える
0

私は自分のプロジェクトで同様のことをしなければならず、で使用celerycamしましdjango-admin-monitorた。モニターはタスクのスナップショットを取り、それらをデータベースに定期的に保存します。また、すべてのタスクのステータスを参照および確認するための優れたユーザーインターフェイスがあります。また、プロジェクトがDjangoベースでなくても使用できます。

于 2012-04-08T13:12:44.673 に答える
0

私はしばらく前にこれに似たものを実装しました、そして私が思いついた解決策はあなたのものと非常に似ていました。

この問題を解決する方法はTask、ジョブの実行時にワーカーにデータベースからオブジェクトをフェッチさせることでした(ドキュメントで推奨されているように、主キーを渡すことによって)。あなたの場合、リマインダーが送信される前に、ワーカーはタスクを実行する準備ができていることを確認するためのチェックを実行する必要があります。そうでない場合は、作業を行わずに単に戻る必要があります(ETAが変更され、別のワーカーが新しいジョブを引き受けると想定しています)。

于 2012-04-08T13:12:58.283 に答える