32

Celery タスクによって返されたリストからグループを作成して、タスク結果セットの各項目に対して 1 つのタスクがグループに追加されるようにしたいと考えています。

ユースケースを説明する簡単なコード例を次に示します。は???、前のタスクの結果である必要があります。

@celery.task
def get_list(amount):
    # In reality, fetch a list of items from a db
    return [i for i in range(amount)]

@celery.task
def process_item(item):
    #do stuff
    pass

process_list = (get_list.s(10) | group(process_item.s(i) for i in ???))

私はおそらくこれに正しくアプローチしていませんが、タスク内からタスクを呼び出すのは安全ではないと確信しています:

@celery.task
def process_list():
    for i in get_list.delay().get():
        process_item.delay(i)

seconds タスクの結果は必要ありません。

4

1 に答える 1

46

この種の動作は、中間タスクを使用して取得できます。これは、あなたが提案したように機能する「マップ」のようなメソッドを作成するデモです。

from celery import task, subtask, group

@task
def get_list(amount):
    return [i for i in range(amount)]

@task
def process_item(item):
    # do stuff
    pass

@task
def dmap(it, callback):
    # Map a callback over an iterator and return as a group
    callback = subtask(callback)
    return group(callback.clone([arg,]) for arg in it)()

# runs process_item for each item in the return of get_list 
process_list = (get_list.s(10) | dmap.s(process_item.s()))

私が同様の問題について彼に助けを求めたときに、この提案をしてくれたAsk Solemの功績.

于 2012-11-26T17:20:55.330 に答える