13

ここで概説した状況と似ていますが、複数の引数を持つタスクを連鎖させるのではなく、複数のエントリを持つ辞書を返すタスクを連鎖させたいという点が異なります。

これは-非常に大まかに抽象的に---私がやろうとしていることです:

tasks.py

@task()
def task1(item1=None, item2=None):
  item3 = #do some stuff with item1 and item2 to yield item3
  return_object = dict(item1=item1, item2=item2, item3=item3)
  return return_object

def task2(item1=None, item2=None, item3=None):
  item4 = #do something with item1, item2, item3 to yield item4
  return_object = dict(item1=item1, item2=item2, item3=item3, item4=item4)
  return return_object

ipythonを使用して、task1を個別に非同期で呼び出すことができます。問題はありません。

また、task2を個別に呼び出して、task1によって二重星の引数として返される結果を返すこともできます。

>>res1 = task1.s(item1=something, item2=something_else).apply_async()
>>res1.status
'SUCCESS'
>>res2 = task2.s(**res1.result).apply_async()
>>res2.status
'SUCCESS

ただし、最終的に達成したいのは上記と同じ最終結果ですが、チェーンを介して、ここでは、task1によって返される(位置)引数ではなく、task1.resultを使用してtask2をインスタンス化する方法を理解できません。 ** kwargs:

chain_result = (task1.s(item1=something, item2=something_else) | task2.s()).apply_async()  #THIS DOESN'T WORK!

辞書の代わりに位置引数を返すようにタスクに戻って書き直すことができると思います。これで問題が解決する可能性がありますが、task2のtask1のreturnオブジェクトに同等の方法でアクセスする方法があるはずです。 **ダブルスターの機能。また、Celeryサブタスクの実装または*argsと**kwargsのどちらかについてかなり明白な何かがここに欠けているのではないかと思います。

これが理にかなっていることを願っています。そして、ヒントを事前に感謝します。

4

3 に答える 3

2

chainその他のキャンバスプリミティブは、mapやなどの機能ユーティリティのファミリーに含まれていますreduce

たとえば、リスト内のすべてのアイテムをmap(target, items)呼び出す場合、Pythonにはめったに使用されないバージョンのmapがあり、代わりに。を呼び出します。target(item)itertools.starmaptarget(*item)

ツールボックスに追加することstarchainもできkwstarchainますが、これらは非常に特殊化されており、おそらくそれほど頻繁には使用されません。

興味深いことに、Pythonはリスト式とジェネレーター式でこれらを不要にしているため、mapはに置き換えられ[target(item) for item in item]、starmapはに置き換えられてい[target(*item) for item in item]ます。

したがって、プリミティブごとにいくつかの選択肢を実装するのではなく、これをサポートするためのより柔軟な方法を見つけることに焦点を当てる必要があると思います。

于 2013-02-20T13:12:17.727 に答える
1

これはセロリに組み込まれていないため、デコレータ関数を自分で似たようなものに書きました。

# Use this wrapper with functions in chains that return a tuple. The
# next function in the chain will get called with that the contents of
# tuple as (first) positional args, rather than just as just the first
# arg. Note that both the sending and receiving function must have
# this wrapper, which goes between the @task decorator and the
# function definition. This wrapper should not otherwise interfere
# when these conditions are not met.

class UnwrapMe(object):
    def __init__(self, contents):
        self.contents = contents

    def __call__(self):
        return self.contents

def wrap_for_chain(f):
    """ Too much deep magic. """
    @functools.wraps(f)
    def _wrapper(*args, **kwargs):
        if type(args[0]) == UnwrapMe:
            args = list(args[0]()) + list(args[1:])
        result = f(*args, **kwargs)

        if type(result) == tuple and current_task.request.callbacks:
            return UnwrapMe(result)
        else:
            return result
    return _wrapper

私のものはstarchainコンセプトのようにアンラップしますが、代わりにクワーグをアンラップするように簡単に変更できます。

于 2013-04-03T03:05:55.450 に答える