4

次のスクリプトは、サイズ 100000 の 100 個のランダムな辞書を生成し、各 (キー、値) タプルをキューに送り、1 つの別のプロセスがキューから読み取ります。

import multiprocessing as mp

import numpy.random as nr


def get_random_dict(_dummy):
    return dict((k, v) for k, v in enumerate(nr.randint(pow(10, 9), pow(10, 10), pow(10, 5))))

def consumer(q):
    for (k, v) in iter(q.get, 'STOP'):
        pass

q = mp.Queue()
p = mp.Process(target=consumer, args=(q,))
p.start()
for d in mp.Pool(1).imap_unordered(get_random_dict, xrange(100)):
    for k, v in d.iteritems():
        q.put((k, v))
q.put('STOP')
p.join()

メインプロセスがフィードするときにコンシューマープロセスがキューからデータをプルするため、メモリ使用量が一定であると予想していました。キューにデータが溜まらないことを確認しました。

ただし、メモリ消費量を監視したところ、スクリプトが実行されるにつれて増加し続けています。に置き換えるimap_unorderedfor _ in xrange(100): d = get_random_dict()、メモリ消費量は一定になります。説明は何ですか?

4

2 に答える 2

1

Pool.imapは と文字通り同一ではありませんimapimapのように使用でき、イテレータを返すという点では同じです。ただし、実装はまったく異なります。バッキング プールは、与えられたすべてのジョブをできるだけ早く完了するために、イテレータがどれだけ速く消費されるかに関係なく、できる限り懸命に働きます。要求されたときにのみジョブを処理したい場合は、 を使用しても意味がありませんmultiprocessing。そのまま使用itertools.imapして終了することもできます。

したがって、メモリ消費量が増加している理由は、コンシューマー プロセスが辞書を消費するよりもプールが高速に辞書を作成しているためです。これは、プールがワーカー プロセスから結果を取得する方法が単方向 (1 つのプロセスが書き込み、プロセスが読み取る) であるため、明示的な同期メカニズムが必要ないためです。一方、aQueueは双方向です。両方のプロセスがキューの読み取りと書き込みを行うことができます。これは、次の項目をキューに追加したり、キューから項目を削除したりするために競合していないことを確認するために、キューを使用するプロセス間で明示的な同期が必要であることを意味します (したがって、キューは一貫性のない状態のままになります)。

于 2014-11-07T15:53:46.633 に答える