質問
私はセロリを使用して、次のようなタスクセットを起動します。
- 並行して実行できるタスクのバッチを実行します。このバッチのタスクの数は、数万から数千までさまざまです。
これらのタスクの結果を1つの回答に集約してから、この回答を使用して何かを実行します。たとえば、データベースに保存したり、特別な結果ファイルに保存したりします。基本的に、タスクの実行が完了した後、次のシグネチャを持つ関数を呼び出す必要があります。
def callback(result_file_name, task_result_list): #store in file def callback(entity_key, task_result_list): #store in db
今のところ、ステップ1はセロリキューで実行され、ステップ2はセロリの外で実行されます。
tasks = []
# add taksks to tasks list
task_group = group()
task_group.tasks = tasks
result = task_group.apply_async()
res = result.join()
# Aggregate results
# Save results to file, database whatever
すべてのタスクが実行されるまで(数時間かかる場合があります)単一のスレッドを停止する必要があるため、このアプローチは面倒です。
どういうわけかステップ2をセロリにも移動したいと思います---基本的に、タスクセット全体にコールバックを追加するか(セロリでサポートされていないことがわかっている限り)、これらすべてのサブタスクの後に実行されるタスクを送信する必要があります。
誰かがそれを行う方法を知っていますか?django環境で使用しているので、データベースに状態を保存できます。
私の最近の調査結果を要約すると
コードはしません
コードを使用すると、次のように見えるコールバックを作成できるため、コードを直接使用することはできません。
def callback(task_result_list):
#store in file
追加のパラメーターをコールバックに渡す明確な方法はありません(特に、これらのコールバックをローカル関数にすることはできないため)。
データベースを使用する
を使用して結果を保存できますTaskSetMeta
が、このエンティティにはステータスフィールドがありません---したがって、TaskSetMetaにシグナルを追加する場合でも、かなりのオーバーヘッドが発生する可能性のあるタスク結果をプールする必要があります。