0

次のプロジェクトにはスケジューラが必要で、Django を使用してコーディングしているため、Celeryを使用しました。

私が探しているのは、タスクが完了したときに Django に通知する方法です。そのため、データベースを更新し、SSEを使用してユーザーに通知できます。これはすべて、すべてのロジックをタスクに入れるだけで非常に簡単に実行できます。しかし、複数のセロリ ワーカーを計画している場合はどうすればよいでしょうか。

単一のワーカーのケースをカバーするためにオンラインでたくさんの情報を見つけましたが、複数のワーカーがいる場合の問題をカバーするものは多くありません.

私が考えたのは、ワーカーから Web サーバーへの http コールバックを使用して、タスクが完了したことを知らせることでした。celery.task.httpを見ると有望に見えましたが、必要なことをしませんでした。

シグナルを使用して手動の http 呼び出しを接続するソリューションはありますか? それとも私は間違った道を進んでいますか?これは一般的な問題ではありませんか?これをよりエレガントに解決するにはどうすればよいでしょうか?

4

2 に答える 2

1

では、Django に伝えるとはどういう意味ですか? Celeryタスクを初期化したdjangoリクエストは、このタスクが終了した時点でまだ生きていると理解していますか? その場合、いくつかのストレージ(データベース、memcachedなど)を確認できます。SSEを送信します。ほら、それを行う方法が1つあります。1. django ビュー送信タスクを Celery に送信した後、無限ループ (またはタイムアウト 60 秒のループ) に進み、memcached で結果を待ちます。

  1. Celery はタスクの実行を取得し、結果を memcached に貼り付けます。

  2. Django ビューは新しい結果を取得し、ループを終了して SSE を送信します。

次のバリアントは

  1. DjangoビューはタスクをCeleryに送信し、返します

  2. セロリはタスクを実行します。実行後、django アプリに単純な HTTP リクエストを送信します。

  3. Django は Celery から http リクエストを受け取り、params を解析して SSE をユーザーに再度送信します

于 2013-08-13T11:36:52.357 に答える
0

ここに私が望むことをするように見えるいくつかのコードがあります:

ジャンゴの設定で:

CELERY_ANNOTATIONS = {
    "*": {
        "on_failure": celery_handlers.on_failure,
        "on_success": celery_handlers.on_success
    }
}

celery_handlers.py ファイルには以下が含まれています。

def on_failure(self, exc, task_id, *args, **kwargs):
    # Use urllib or similar to poke eg; api-int.mysite.com/task_handler/TASK_ID
    pass

def on_success(self, retval, task_id, *args, **kwargs):
    # Use urllib or similar to poke eg; api-int.mysite.com/task_handler/TASK_ID
    pass

そして、次のようなものを使用するように api-int をセットアップするだけです:

from celery.result import AsyncResult
task_obj = AsyncResult(task_id)
# Logic to handle task_obj.result and related goes here....
于 2013-08-13T12:50:39.383 に答える