24

私はmultiprocessing.Pool()いくつかの重い計算を並列化するために使用しています。

ターゲット関数は大量のデータ (巨大なリスト) を返します。RAMが不足しています。

がなければ、計算された結果の要素を次々と ing することmultiprocessingで、ターゲット関数をジェネレーターに変更するだけです。yield

マルチプロセッシングはジェネレーターをサポートしていないことを理解しています.出力全体を待って一度に返しますよね?譲歩なし。Pool結果配列全体をRAMに構築せずに、ワーカーが利用可能になるとすぐにデータを生成する方法はありますか?

簡単な例:

def target_fnc(arg):
   result = []
   for i in xrange(1000000):
       result.append('dvsdbdfbngd') # <== would like to just use yield!
   return result

def process_args(some_args):
    pool = Pool(16)
    for result in pool.imap_unordered(target_fnc, some_args):
        for element in result:
            yield element

これは Python 2.7 です。

4

3 に答える 3

17

これはキューの理想的な使用例のように思えます: http://docs.python.org/2/library/multiprocessing.html#exchange-objects-between-processes

結果をプールされたワーカーからキューにフィードし、マスターに取り込むだけです。

ワーカーがキューにデータを入力するのとほぼ同じ速さでキューを排出しない限り、メモリ不足の問題が発生する可能性があることに注意してください。キューのサイズ (キューに収まるオブジェクトの最大数) を制限できます。この場合、プールされたワーカーはqueue.put、キューにスペースが空くまでステートメントをブロックします。これにより、メモリ使用量に上限が設定されます。 しかし、これを行っている場合は、プーリングが必要かどうか、および/またはより少ないワーカーを使用することに意味があるかどうかを再検討する時期かもしれません。

于 2013-02-03T21:21:07.723 に答える
4

あなたの説明から、100万要素を返すことを避けるのと同じように、データが入ってくるのでデータを処理することにそれほど興味がないように思えますlist

これを行う簡単な方法があります。データをファイルに入れるだけです。例えば:

def target_fnc(arg):
    fd, path = tempfile.mkstemp(text=True)
    with os.fdopen(fd) as f:
        for i in xrange(1000000):
            f.write('dvsdbdfbngd\n')
    return path

def process_args(some_args):
    pool = Pool(16)
    for result in pool.imap_unordered(target_fnc, some_args):
        with open(result) as f:
            for element in f:
                yield element

明らかに、結果に改行が含まれる可能性がある場合、または文字列などではない場合は、単純なテキストファイルの代わりにcsvファイル、などを使用することをお勧めしますが、考え方は同じです。numpy

そうは言っても、これが単純な場合でも、データを一度に1チャンクで処理することには通常利点があるためQueue、欠点(それぞれ、タスクを分割する方法が必要な場合、またはデータが生成されるのと同じ速さでデータを消費できる必要がある場合)は、取引を妨げるものではありません。

于 2013-02-03T22:46:28.430 に答える
3

タスクがチャンクでデータを返すことができる場合、それぞれが 1 つのチャンクを返す小さなタスクに分割できますか? 明らかに、これは常に可能であるとは限りません。そうでない場合は、他のメカニズムを使用する必要があります ( QueueLoren Abrams が示唆するように)。しかし、それの場合、この問題を解決するだけでなく、他の理由からおそらくより良い解決策です。

あなたの例では、これは確かに実行可能です。例えば:

def target_fnc(arg, low, high):
   result = []
   for i in xrange(low, high):
       result.append('dvsdbdfbngd') # <== would like to just use yield!
   return result

def process_args(some_args):
    pool = Pool(16)
    pool_args = []
    for low in in range(0, 1000000, 10000):
        pool_args.extend(args + [low, low+10000] for args in some_args)
    for result in pool.imap_unordered(target_fnc, pool_args):
        for element in result:
            yield element

zip(もちろん、ループをネストされた内包表記またはandに置き換えることもできflattenます。)

したがって、 が の場合some_args[1, 2, 3]300 個のタスクが得られます — <code>[[1, 0, 10000], [2, 0, 10000], [3, 0, 10000], [1, 10000, 20000], …] 、それぞれが 1000000 ではなく 10000 要素のみを返します。

于 2013-02-03T22:33:24.693 に答える