ここで概説した状況と似ていますが、複数の引数を持つタスクを連鎖させるのではなく、複数のエントリを持つ辞書を返すタスクを連鎖させたいという点が異なります。
これは-非常に大まかに抽象的に---私がやろうとしていることです:
tasks.py
@task()
def task1(item1=None, item2=None):
item3 = #do some stuff with item1 and item2 to yield item3
return_object = dict(item1=item1, item2=item2, item3=item3)
return return_object
def task2(item1=None, item2=None, item3=None):
item4 = #do something with item1, item2, item3 to yield item4
return_object = dict(item1=item1, item2=item2, item3=item3, item4=item4)
return return_object
ipythonを使用して、task1を個別に非同期で呼び出すことができます。問題はありません。
また、task2を個別に呼び出して、task1によって二重星の引数として返される結果を返すこともできます。
>>res1 = task1.s(item1=something, item2=something_else).apply_async()
>>res1.status
'SUCCESS'
>>res2 = task2.s(**res1.result).apply_async()
>>res2.status
'SUCCESS
ただし、最終的に達成したいのは上記と同じ最終結果ですが、チェーンを介して、ここでは、task1によって返される(位置)引数ではなく、task1.resultを使用してtask2をインスタンス化する方法を理解できません。 ** kwargs:
chain_result = (task1.s(item1=something, item2=something_else) | task2.s()).apply_async() #THIS DOESN'T WORK!
辞書の代わりに位置引数を返すようにタスクに戻って書き直すことができると思います。これで問題が解決する可能性がありますが、task2のtask1のreturnオブジェクトに同等の方法でアクセスする方法があるはずです。 **ダブルスターの機能。また、Celeryサブタスクの実装または*argsと**kwargsのどちらかについてかなり明白な何かがここに欠けているのではないかと思います。
これが理にかなっていることを願っています。そして、ヒントを事前に感謝します。