37

ブローカーとしてRedisでCelery(3.0.15)を使用しています。

Celeryキューに存在する特定の名前のタスクの数を照会する簡単な方法はありますか?

そして、フォローアップとして、Celeryキューに存在する特定の名前のすべてのタスクをキャンセルする方法はありますか?

監視および管理ガイドを確認しましたが、解決策が見つかりません。

4

5 に答える 5

40
# Retrieve tasks
# Reference: http://docs.celeryproject.org/en/latest/reference/celery.events.state.html
query = celery.events.state.tasks_by_type(your_task_name)

# Kill tasks
# Reference: http://docs.celeryproject.org/en/latest/userguide/workers.html#revoking-tasks
for uuid, task in query:
    celery.control.revoke(uuid, terminate=True)
于 2013-03-26T16:18:03.667 に答える
17

以前の回答では対処されておらず、気づいていないと人々を失望させる可能性があるという問題が1つあります。

すでに投稿されているソリューションの中で、ダニエルを少し変更して使用します。タスクをファイルにインポートし、その.name属性を使用して、に渡すタスク名を取得します.tasks_by_type()

app.control.revoke(
    [uuid for uuid, _ in
     celery.events.state.State().tasks_by_type(task.name)])

ただし、このソリューションでは、将来の実行がスケジュールされているタスクは無視されます。他の答えにコメントした何人かの人々のように、私が何を.tasks_by_type()返すかをチェックしたとき、私は空のリストを持っていました。そして確かに私のキューは空でした。しかし、将来実行される予定のタスクがあることは知っていました。これらが私の主なターゲットでした。実行することでそれらを見ることができましたcelery -A [app] inspect scheduledが、上記のコードの影響を受けませんでした。

これを行うことで、スケジュールされたタスクを取り消すことができました。

app.control.revoke(
    [scheduled["request"]["id"] for scheduled in
     chain.from_iterable(app.control.inspect().scheduled()
                         .itervalues())])

app.control.inspect().scheduled()キーがワーカー名で、値がスケジューリング情報のリストchain.from_iterableであるディクショナリを返します(したがって、その必要性はからインポートされitertoolsます)。タスク情報は"request"スケジューリング情報のフィールドにあり"id"、タスクIDが含まれています。失効した後でも、スケジュールされたタスクはスケジュールされたタスクの中に表示されることに注意してください。取り消されたスケジュールされたタスクは、タイマーが期限切れになるか、Celeryがクリーンアップ操作を実行するまで、スケジュールされたタスクのリストから削除されません。(ワーカーを再開すると、そのようなクリーンアップがトリガーされます。)

于 2015-09-03T18:39:30.517 に答える
4

これは1つのリクエストで実行できます。

app.control.revoke([
    uuid
    for uuid, _ in
    celery.events.state.State().tasks_by_type(task_name)
])
于 2015-04-22T04:57:36.267 に答える
3

セロリではいつものように、ここでの答えはまったくうまくいきませんでした。そこで、私はいつものことをして、redisを直接検査するソリューションを一緒にハッキングしました。どうぞ...

# First, get a list of tasks from redis:
import redis, json

r = redis.Redis(
    host=settings.REDIS_HOST,
    port=settings.REDIS_PORT,
    db=settings.REDIS_DATABASES['CELERY'],
)
l = r.lrange('celery', 0, -1)

# Now import the task you want so you can get its name
from my_django.tasks import my_task

# Now, import your celery app and iterate over all tasks 
# from redis and nuke the ones that have a matching name.
from my_django.celery_init import app
for task in l:
     task_headers = json.loads(task)['headers']
     task_name = task_headers["task"]
     if task_name == my_task.name:
         task_id = task_headers['id']
         print("Terminating: %s" % task_id)
         app.control.revoke(task_id, terminate=True)

この方法で取り消しても、プリフェッチされたタスクが取り消されない場合があるため、結果がすぐに表示されない場合があることに注意してください。

また、この回答は優先タスクをサポートしていません。それを行うように変更したい場合は、redisをハックする他の回答のヒントが必要になります。

于 2021-01-09T18:05:00.503 に答える
2

flower監視を提供しているようです。

https://github.com/mher/flower

セロリイベントを使用したリアルタイム監視

タスクの進行状況と履歴タスクの詳細(引数、開始時間、実行時間など)を表示する機能グラフと統計リモートコントロール

ワーカーのステータスと統計の表示ワーカーインスタンスのシャットダウンと再起動ワーカープールのサイズと自動スケール設定の制御ワーカーインスタンスが消費するキューの表示と変更現在実行中のタスクの表示スケジュールされたタスクの表示(ETA /カウントダウン)予約済みおよび取り消されたタスクの表示時間とレートの制限の適用構成ビューアータスクを取り消すか終了するHTTPAPI

OpenID認証

于 2013-03-26T16:20:37.743 に答える