40

プログラムは次のとおりです。

#!/usr/bin/python

import multiprocessing

def dummy_func(r):
    pass

def worker():
    pass

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=16)
    for index in range(0,100000):
        pool.apply_async(worker, callback=dummy_func)

    # clean up
    pool.close()
    pool.join()

メモリ使用量 (VIRT と RES の両方) が close()/join() まで増加し続けていることがわかりました。これを取り除く解決策はありますか? 2.7 で maxtasksperchild を試しましたが、どちらも役に立ちませんでした。

apply_async() を ~6M 回呼び出すより複雑なプログラムがあり、~1.5M の時点で既に 6G+ RES を取得しています。他のすべての要因を回避するために、プログラムを上記のバージョンに単純化しました。

編集:

みんなの意見に感謝します。

#!/usr/bin/python

import multiprocessing

ready_list = []
def dummy_func(index):
    global ready_list
    ready_list.append(index)

def worker(index):
    return index

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=16)
    result = {}
    for index in range(0,1000000):
        result[index] = (pool.apply_async(worker, (index,), callback=dummy_func))
        for ready in ready_list:
            result[ready].wait()
            del result[ready]
        ready_list = []

    # clean up
    pool.close()
    pool.join()

メインプロセスはシングルスレッドであると信じているため、そこにロックを配置しませんでした(コールバックは、私が読んだドキュメントごとに多かれ少なかれイベント駆動型のものに似ています)。

v2 と同じように、v1 のインデックス範囲を 1,000,000 に変更し、いくつかのテストを行いました。v2 は v1 よりも最大 10% 高速です (33 秒と 37 秒)。v2 は間違いなくメモリ使用量の勝者です。300M (VIRT) と 50M (RES) を超えることはありませんでしたが、v1 は以前は 370M/120M で、最高は 330M/85M でした。すべての数値は 3 ~ 4 回のテストであり、参考値です。

4

6 に答える 6

6

処理中の非常に大きな 3D 点群データ セットがあります。multiprocessing モジュールを使用して処理を高速化しようとしましたが、メモリエラーが発生し始めました。いくつかの調査とテストの後、サブプロセスが空にするよりもはるかに速く、処理するタスクのキューをいっぱいにしていることがわかりました。チャンクするか、map_async などを使用して負荷を調整できたと確信していますが、周囲のロジックに大きな変更を加えたくありませんでした。

私が思いついた愚かな解決策は、pool._cache断続的に長さをチェックし、キャッシュが大きすぎる場合は、キューが空になるのを待つことです。

私のメインループには、すでにカウンターとステータス ティッカーがありました。

# Update status
count += 1
if count%10000 == 0:
    sys.stdout.write('.')
    if len(pool._cache) > 1e6:
        print "waiting for cache to clear..."
        last.wait() # Where last is assigned the latest ApplyResult

したがって、プールへの 10k の挿入ごとに、キューに 100 万を超える操作があるかどうかを確認します (メイン プロセスで約 1G のメモリが使用されます)。キューがいっぱいになると、最後に挿入されたジョブが完了するのを待ちます。

これで、私のプログラムはメモリ不足になることなく何時間も実行できます。ワーカーがデータの処理を続けている間、メイン プロセスは時々一時停止します。

ところで、_cache メンバーは、マルチプロセッシング モジュール プールの例に記載されています。

#
# Check there are no outstanding tasks
#

assert not pool._cache, 'cache = %r' % pool._cache
于 2014-01-14T18:51:39.273 に答える
2

これは私が投稿した質問に似ていると思いますが、同じ遅延があるかどうかはわかりません。私の問題は、マルチプロセッシング プールから結果を消費するよりも速く生成していたため、結果がメモリに蓄積されたことでした。それを避けるために、セマフォを使用してプールへの入力を抑制し、消費している出力よりも先に進みすぎないようにしました。

于 2016-12-02T00:55:36.833 に答える