21

multiprocessing.imap_unordered値のリストに対して計算を実行するために使用しています。

def process_parallel(fnc, some_list):
    pool = multiprocessing.Pool()
    for result in pool.imap_unordered(fnc, some_list):
        for x in result:
            yield x
    pool.terminate()

の各呼び出しfncは、設計上、結果として巨大なオブジェクトを返します。そのようなオブジェクトのN個のインスタンスをRAMに格納できます。ここで、N〜cpu_countですが、それ以上(数百ではない)ではありません。

現在、この関数を使用すると、メモリを大量に消費します。メモリは、ワーカーではなく、メインプロセスで完全に消費されます。

完成した結果をどのようにimap_unordered保存しますか?私は、労働者によってすでに返されているが、まだユーザーに渡されていない結果を意味します。私はそれが賢いと思い、必要に応じて「怠惰に」計算するだけでしたが、明らかにそうではありませんでした。

結果を十分に速く消費できないため、プールはこれらの巨大なオブジェクトを内部のどこかprocess_parallelからキューに入れ続け、その後爆発します。fncこれを回避する方法はありますか?どういうわけかその内部キューを制限しますか?


Python2.7を使用しています。乾杯。

4

2 に答える 2

12

対応するソースファイル(python2.7/multiprocessing/pool.py)を調べるとわかるように、IMapUnorderedIteratorはcollections.dequeインスタンスを使用して結果を保存します。新しいアイテムが入ってくると、そのアイテムは反復で追加および削除されます。

あなたが提案したように、メインスレッドがまだオブジェクトを処理している間に別の巨大なオブジェクトが入ってくると、それらもメモリに保存されます。

あなたが試みるかもしれないものはこのようなものです:

it = pool.imap_unordered(fnc, some_list)
for result in it:
    it._cond.acquire()
    for x in result:
        yield x
    it._cond.release()

これにより、アイテムが次のオブジェクトを両端キューに入れようとしている場合、アイテムの処理中にtask-result-receiver-threadがブロックされるはずです。したがって、メモリ内に2つを超える巨大なオブジェクトがあってはなりません。それがあなたの場合にうまくいくなら、私は知りません;)

于 2012-06-27T10:14:58.273 に答える
2

私が考えることができる最も簡単な解決策は、fnc関数をラップするクロージャーを追加することです。これは、セマフォを使用して、一度に実行できる同時ジョブ実行の総数を制御します(メインプロセス/スレッドはセマフォをインクリメントすると思います) )。セマフォ値は、ジョブのサイズと使用可能なメモリに基づいて計算できます。

于 2012-06-29T00:01:10.700 に答える