10

次のように、チェーン内でグループ(またはチャンク)を使用したいと思います。

chain(getRange.s(3),  GROUP() , xsum.s() )

タスクGROUP()のグループはどこにありますか。同様の質問が、リストをグループに返すCeleryタスクをチェーンする方法に投稿されました。ただし、グループからの出力をチェーン内の次のタスクに渡す方法については説明されていません。double()group(double(0),double(1),double(2))

@task
def getRange(x):
    return range(x)

@task
def double(nr):
    return nr*2

@task
def xsum(list):
    return sum(list)
4

1 に答える 1

9

単一チェーンの現在のプリミティブでそれを行う方法があるとは思いません。あなたが言及した質問のようにコールバックを渡すと、グループタスクが終了したときに聞くことができなくなります。あなたが得ることができる最も近いものは次のようなものです:

@task
def get_range(x):
  return range(x)

@task
def mapper(nr):
  return nr * 2

@task
def reducer(nrs):
  return sum(nrs)

@task
def double_then_sum(nrs):
  return (
    group([mapper.s(nr) for nr in nrs]) |
    reducer.s()
  )()

ar = (get_range.s(3) | double_then_sum.s())() # call the procedure
ar.result.result # get the result

それ以外の場合は、より簡単なソリューションにつながるdynamic chainingmapを使用するか、グループ化されたタスクを並行して実行する必要がない場合に使用することができます。

于 2013-02-21T05:18:07.387 に答える