118

タスクがセロリで実行されているかどうかをどのように確認しますか(具体的には、私はcelery-djangoを使用しています)?

ドキュメントを読み、グーグルで検索しましたが、次のような呼び出しが表示されません。

my_example_task.state() == RUNNING

私のユースケースは、トランスコーディング用の外部(java)サービスがあることです。トランスコードするドキュメントを送信するときに、そのサービスを実行しているタスクが実行されているかどうかを確認し、実行されていない場合は、それを(再)開始します。

私は現在の安定したバージョン-2.4を使用していると思います。

4

13 に答える 13

115

task_id(.delay()から指定)を返し、後でceleryインスタンスに状態について尋ねます。

x = method.delay(1,2)
print x.task_id

質問するときは、次のtask_idを使用して新しいAsyncResultを取得します。

from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()
于 2012-01-27T14:41:36.377 に答える
92

AsyncResultタスクIDからオブジェクトを作成することは、 FAQで推奨されている方法であり、タスクIDしか持っていない場合にタスクのステータスを取得します。

ただし、Celery 3.xの時点では、注意を払わないと人々を噛む可能性のある重大な警告があります。それは実際には特定のユースケースシナリオに依存します。

デフォルトでは、Celeryは「実行中」の状態を記録しません。

Celeryがタスクの実行を記録するには、に設定task_track_startedする必要がありますTrue。これをテストする簡単なタスクは次のとおりです。

@app.task(bind=True)
def test(self):
    print self.AsyncResult(self.request.id).state

task_track_startedFalseデフォルトの場合、タスクが開始されていても状態が表示さPENDINGれます。に設定task_track_startedするTrueと、状態はになりますSTARTED

状態PENDINGは「わからない」という意味です。

AsyncResult状態のあるとPENDINGは、Celeryがタスクのステータスを知らないということ以上の意味はありません。これは、さまざまな理由が原因である可能性があります。

一つにAsyncResultは、無効なタスクIDで構築できます。このような「タスク」は、Celeryによって保留中と見なされます。

>>> task.AsyncResult("invalid").status
'PENDING'

わかりました。明らかに無効なIDをにフィードする人は誰もいませんAsyncResult。十分に公平ですが、それはまたAsyncResult、正常に実行されたが、Celeryがであると忘れているタスクを考慮する効果もありますPENDING繰り返しますが、一部のユースケースシナリオでは、これが問題になる可能性があります。問題の一部は、結果バックエンドでの「トゥームストーン」の可用性に依存するため、タスクの結果を保持するようにCeleryがどのように構成されているかにかかっています。(「トゥームストーン」は、タスクがどのように終了したかを記録するデータチャンクのCeleryドキュメントでの使用という用語です。)の場合、使用はAsyncResultまったく機能しません。さらに厄介な問題は、Celeryがデフォルトでトゥームストーンを期限切れにすることです。ザtask_ignore_resultTrueresult_expiresデフォルトの設定は24時間に設定されています。したがって、タスクを起動し、IDを長期保存に記録し、24時間後にそれを使用して作成するAsyncResultと、ステータスはになりますPENDING

すべての「実際のタスク」はそのPENDING状態で始まります。したがってPENDING、タスクに参加するということは、タスクが要求されたが、(何らかの理由で)これ以上進行しなかったことを意味する可能性があります。または、タスクは実行されたが、Celeryがその状態を忘れたことを意味する場合があります。

痛い!AsyncResult私にはうまくいきません。他に何ができますか?

タスク自体を追跡するよりも、目標を追跡する方が好きです。私はいくつかのタスク情報を保持していますが、それは本当に目標を追跡することの二次的なものです。目標は、Celeryから独立したストレージに保存されます。リクエストが達成された目標に応じて計算を実行する必要がある場合、目標がすでに達成されているかどうかを確認します。達成されている場合は、このキャッシュされた目標を使用します。それ以外の場合は、目標に影響を与えるタスクを開始し、に送信します。 HTTP要求を行ったクライアントは、結果を待つ必要があることを示す応答を要求しました。


上記の変数名とハイパーリンクはCelery4.x用です。3.xでは、対応する変数とハイパーリンクは次のとおりです。 CELERY_TRACK_STARTED、、。CELERY_IGNORE_RESULTCELERY_TASK_RESULT_EXPIRES

于 2016-07-08T13:30:40.343 に答える
69

すべてTaskのオブジェクトには、オブジェクト.requestを含むプロパティがありAsyncRequestます。したがって、次の行はタスクの状態を示していますtask

task.AsyncResult(task.request.id).state
于 2012-01-28T14:59:57.713 に答える
17

カスタム状態を作成して、タスク実行時の値を更新することもできます。この例はドキュメントからのものです:

@app.task(bind=True)
def upload_files(self, filenames):
    for i, file in enumerate(filenames):
        if not self.request.called_directly:
            self.update_state(state='PROGRESS',
                meta={'current': i, 'total': len(filenames)})

http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states

于 2015-01-03T04:47:54.903 に答える
17

古い質問ですが、最近この問題に遭遇しました。

task_idを取得しようとしている場合は、次のように実行できます。

import celery
from celery_app import add
from celery import uuid

task_id = uuid()
result = add.apply_async((2, 2), task_id=task_id)

これで、task_idが正確にわかったので、それを使用してAsyncResultを取得できます。

# grab the AsyncResult 
result = celery.result.AsyncResult(task_id)

# print the task id
print result.task_id
09dad9cf-c9fa-4aee-933f-ff54dae39bdf

# print the AsyncResult's status
print result.status
SUCCESS

# print the result returned 
print result.result
4
于 2016-07-10T01:35:14.690 に答える
11

セロリFAQからこのAPIを使用するだけです

result = app.AsyncResult(task_id)

これは正常に機能します。

于 2018-11-29T06:33:10.690 に答える
2

2020年の回答:

#### tasks.py
@celery.task()
def mytask(arg1):
    print(arg1)

#### blueprint.py
@bp.route("/args/arg1=<arg1>")
def sleeper(arg1):
    process = mytask.apply_async(args=(arg1,)) #mytask.delay(arg1)
    state = process.state
    return f"Thanks for your patience, your job {process.task_id} \
             is being processed. Status {state}"
于 2020-03-18T09:20:44.213 に答える
0

試す:

task.AsyncResult(task.request.id).state

これにより、Celeryタスクのステータスが提供されます。Celery TaskがすでにFAILURE状態にある場合、例外がスローされます。

raised unexpected: KeyError('exc_type',)

于 2016-05-07T05:44:33.290 に答える
0

で役立つ情報を見つけました

セレリープロジェクト労働者ガイド検査-労働者

私の場合、Celeryが実行されているかどうかを確認しています。

inspect_workers = task.app.control.inspect()
if inspect_workers.registered() is None:
    state = 'FAILURE'
else:
    state = str(task.state) 

あなたはあなたのニーズを得るために検査で遊ぶことができます。

于 2017-07-12T22:53:20.900 に答える
0
  • まず、セロリAPPで:</ li>

vi my_celery_apps / app1.py

app = Celery(worker_name)
  • 次に、タスクファイルに変更し、セロリアプリモジュールからアプリをインポートします。

viタスク/task1.py

from my_celery_apps.app1 import app

app.AsyncResult(taskid)

try:
   if task.state.lower() != "success":
        return
except:
    """ do something """

于 2019-09-24T07:48:17.410 に答える
-1

簡単なタスクの場合は、http: //flower.readthedocs.io/en/latest/screenshots.htmlhttp://policystat.github.io/jobtastic/を使用して監視を行うことができます。

複雑なタスクの場合、他の多くのモジュールを処理するタスクを言います。特定のタスクユニットで進行状況とメッセージを手動で記録することをお勧めします。

于 2017-03-31T06:18:29.470 に答える
-1
res = method.delay()
    
print(f"id={res.id}, state={res.state}, status={res.status} ")

print(res.get())
于 2020-07-26T03:38:34.277 に答える
-3

上記とは別に、フラワータスクのステータスを使用したプログラムによるアプローチは簡単に確認できます。

セロリイベントを使用したリアルタイム監視。Flowerは、Celeryクラスターを監視および管理するためのWebベースのツールです。

  1. タスクの進行状況と履歴
  2. タスクの詳細(引数、開始時間、実行時間など)を表示する機能
  3. グラフと統計

公式文書: 花-セロリ監視ツール

インストール:

$ pip install flower

使用法:

http://localhost:5555

更新:これにはバージョン管理の問題があります。flower(バージョン= 0.9.7)はセロリ(バージョン= 4.4.7)でのみ機能します。flowerをインストールすると、上位バージョンのセロリが4.4.7にアンインストールされ、これは機能しません。登録されたタスク

于 2018-06-05T17:13:19.797 に答える