6

質問

私はセロリを使用して、次のようなタスクセットを起動します。

  1. 並行して実行できるタスクのバッチを実行します。このバッチのタスクの数は、数万から数千までさまざまです。
  2. これらのタスクの結果を1つの回答に集約してから、この回答を使用して何かを実行します。たとえば、データベースに保存したり、特別な結果ファイルに保存したりします。基本的に、タスクの実行が完了した後、次のシグネチャを持つ関数を呼び出す必要があります。

    def callback(result_file_name, task_result_list): 
        #store in file
    
    
    def callback(entity_key, task_result_list):
        #store in db 
    

今のところ、ステップ1はセロリキューで実行され、ステップ2はセロリの外で実行されます。

    tasks = []

    # add taksks to tasks list 

    task_group = group()
    task_group.tasks = tasks

    result = task_group.apply_async()

    res = result.join()

    # Aggregate results 

    # Save results to file, database whatever

すべてのタスクが実行されるまで(数時間かかる場合があります)単一のスレッドを停止する必要があるため、このアプローチは面倒です。

どういうわけかステップ2をセロリにも移動したいと思います---基本的に、タスクセット全体にコールバックを追加するか(セロリでサポートされていないことがわかっている限り)、これらすべてのサブタスクの後に実行されるタスクを送信する必要があります。

誰かがそれを行う方法を知っていますか?django環境で使用しているので、データベースに状態を保存できます。

私の最近の調査結果を要約すると

コードはしません

コードを使用すると、次のように見えるコールバックを作成できるため、コードを直接使用することはできません。

    def callback(task_result_list): 
        #store in file

追加のパラメーターをコールバックに渡す明確な方法はありません(特に、これらのコールバックをローカル関数にすることはできないため)。

データベースを使用する

を使用して結果を保存できますTaskSetMetaが、このエンティティにはステータスフィールドがありません---したがって、TaskSetMetaにシグナルを追加する場合でも、かなりのオーバーヘッドが発生する可能性のあるタスク結果をプールする必要があります。

4

1 に答える 1

3

答えは本当に簡単で、コードを使用することもできます。追加のパラメーター(レポートファイル名など)をkwargsとして渡す必要があります。

コードタスクは次のとおりです。

@task
def print_and_sum(to_sum, file_name):
    print file_name
    print sum(to_sum)
    return file_name, sum(to_sum)

インスタンス化する方法は次のとおりです。

subtasks = [...]
result = chord(subtasks)(print_and_sum.subtask(kwargs={'file_name' : 'report_file.csv'}))
于 2012-06-22T13:41:46.217 に答える