13

Celery Groupプリミティブを map/reduce ワークフローのアンブレラ タスクとして使用できますか?

またはより具体的には、グループ内のサブタスクを複数のサーバー上の複数のワーカーで実行できますか?

ドキュメントから:

However, if you call apply_async on the group it will send a special 
grouping task, so that the action of calling the tasks happens in a worker 
instead of the current process

これは、タスクがすべて 1 つのワーカーに送信されることを意味しているようです...

3.0 より前 (および現在も) は、複数のサーバーで実行される TaskSet でサブタスクを起動できました。問題は、すべてのタスクの実行が終了したかどうかを判断することです。これは通常、すべてのサブタスクをポーリングすることによって行われますが、これはあまりエレガントではありません。グループ プリミティブを使用してこの問題を軽減できるかどうか疑問に思っています。

4

1 に答える 1

27

このようなマップリデュースのような問題にChordsを使用できることがわかりました。

@celery.task(name='ic.mapper')
def mapper():
    #split your problem in embarrassingly parallel maps 
    maps = [map.s(), map.s(), map.s(), map.s(), map.s(), map.s(), map.s(), map.s()]
    #and put them in a chord that executes them in parallel and after they finish calls 'reduce'
    mapreduce = celery.chord(maps)(reduce.s())    
    return "{0} mapper ran on {1}".format(celery.current_task.request.id, celery.current_task.request.hostname)

@celery.task(name='ic.map')
def map():
    #do something useful here
    import time
    time.sleep(10.0)
    return "{0} map ran on {1}".format(celery.current_task.request.id, celery.current_task.request.hostname)

@celery.task(name='ic.reduce')
def reduce(results):
    #put the maps together and do something with the results
    return "{0} reduce ran on {1}".format(celery.current_task.request.id, celery.current_task.request.hostname)

マッパーが 3 つのワーカー/サーバーのクラスターで実行されると、最初にマッパーが実行され、問題が分割され、ブローカーに再度送信される新しいサブタスクが作成されます。キューはすべてのブローカーによって消費されるため、これらは並行して実行されます。また、すべてのマップをポーリングしてマップが終了したかどうかを確認する chord タスクが作成されます。完了すると、reduce タスクが実行され、結果を元に戻すことができます。

全体として:はい、可能です。野菜の人ありがとう!

于 2012-10-15T14:13:23.827 に答える