7

1)現在、REST APIを公開し、DjangoとCeleryを使用してリクエストを処理して解決するWebアプリケーションに取り組んでいます。解決するためのリクエストの場合、一連のセロリタスクをamqpキューに送信して、ワーカー(他のマシンにある)で実行されるようにする必要があります。各タスクはCPUに非常に負荷がかかり、完了するまでに非常に長い(時間)かかります。

結果バックエンドとしてamqpも使用するようにCeleryを構成し、CeleryのブローカーとしてRabbitMQを使用しています。

各タスクは、後でDBに保存する必要がある結果を返しますが、ワーカーが直接保存する必要はありません。「中央ノード」(django-celeryを実行し、RabbitMQキューでタスクを公開するマシン)のみがこのストレージDBにアクセスできるため、ワーカーからの結果はこのマシンで何らかの形で返される必要があります。

問題は、後でタスク実行の結果をどのように処理できるかということです。したがって、ワーカーが終了すると、その結果は構成済みの結果バックエンド(amqp)に格納されますが、そこから結果を取得して処理するための最良の方法がわかりません。

ドキュメントで私が見つけたのは、次の方法で結果のステータスを時々確認できることだけです。

result.state

つまり、基本的に、このコマンドを定期的に実行する専用のコードが必要であるため、これだけでスレッド/プロセス全体をビジー状態に保つか、次のコマンドですべてをブロックします。

result.get()

タスクが終了するまで、それは私が望むことではありません。

私が考えることができる唯一の解決策は、「中央ノード」に、送信時に各タスクによって返されるasync_resultsを基本的にチェックする関数を定期的に実行する追加のスレッドを用意し、タスクのステータスが終了した場合にアクションを実行することです。

他に何か提案はありますか?

また、バックエンド結果の処理は「中央ノード」で行われるため、この操作による本機への影響を最小限に抑えることを目指しています。

それを行うための最良の方法は何でしょうか?

2)人々は通常、労働者から返された結果を処理し、バックエンドの結果を入れるという問題をどのように解決しますか?(バックエンドの結果が構成されていると仮定します)

4

1 に答える 1

2

あなたの質問を完全に理解しているかどうかはわかりませんが、各タスクにはタスク ID があることを考慮してください。タスクがユーザーによって送信されている場合は、ID を保存してから、次のように json を使用して結果を確認できます。

#urls.py 
from djcelery.views import is_task_successful

urlpatterns += patterns('',
    url(r'(?P<task_id>[\w\d\-\.]+)/done/?$', is_task_successful,
        name='celery-is_task_successful'),
    )

他の関連する概念は、終了した各タスクがシグナルを発するシグナルの概念です。終了したタスクは task_success シグナルを発行します。詳細については、リアルタイム procを参照してください。

于 2013-02-17T10:51:01.180 に答える