3

マルチプロセッシングを行うために次のパターンを使用しています。

    for item in data:
        inQ.put(item)

    for i in xrange(nProcesses):
        inQ.put('STOP')
        multiprocessing.Process(target=worker, args=(inQ, outQ)).start()

    inQ.join()
    outQ.put('STOP')

    for result in iter(outQ.get, 'STOP'):
        # save result

これはうまくいきます。しかし、 を介して numpy 配列を送信するとoutQ'STOP'が の最後に終わらずoutQ、結果取得ループが早期に終了します。

これは、動作を再現するためのコードです。

import multiprocessing
import numpy as np

def worker(inQ, outQ):
    for i in iter(inQ.get, 'STOP'):
        result = np.random.rand(1,100)
        outQ.put(result)
        inQ.task_done()
    inQ.task_done() # for the 'STOP'

def main():
    nProcesses = 8
    data = range(1000)

    inQ = multiprocessing.JoinableQueue()
    outQ = multiprocessing.Queue()
    for item in data:
        inQ.put(item)

    for i in xrange(nProcesses):
        inQ.put('STOP')
        multiprocessing.Process(target=worker, args=(inQ, outQ)).start()

    inQ.join()
    print outQ.qsize()
    outQ.put('STOP')

    cnt = 0
    for result in iter(outQ.get, 'STOP'):
        cnt += 1
    print "got %d items" % cnt
    print outQ.qsize()

if __name__ == '__main__':
    main()

result = np.random.rand(1,100)を次のようなものに置き換えるとresult = i*i、コードは期待どおりに機能します。

ここで何が起きてるの?ここで根本的に間違ったことをしていますか?すべてのプロセスまでのブロックがすべてoutQ.put()のs.inQ.join()join()put()

私のために働く回避策は、結果をフェッチするループをwhile outQ.qsize() > 0で実行することです。しかし、私が読んだことqsize()は信頼できません。信頼できないのは、さまざまなプロセスが実行されている間だけですか? qsize()を行った後、私が頼りになるのは安全inQ.join()ですか?

を使用することを提案する人もいるとmultiprocessing.Pool.map()思いますが、numpy 配列 (ndarrays) でそれを行うと、pickle エラーが発生します。

ご覧いただきありがとうございます。

4

2 に答える 2

1

numpy 配列は豊富な比較を使用します。したがって、a=='STOP' はブール値ではなく numpy 配列を返し、その numpy 配列をブール値に強制することはできません。内部では、 iter(outQ.get, 'STOP') はまさにその比較を行っており、結果を bool に変換しようとすると、おそらく False として例外を処理しています。'STOP' と比較する前に、手動の while ループを実行し、キューからアイテムをプルし、isinstance(item, basestring) かどうかを確認する必要があります。

while True:
    item = outQ.get()
    if isinstance(item, basestring) and item == 'STOP':
        break
    cnt += 1

入力キューが結合された後、他のプロセスがキューに追加されていないため、qsize() のチェックもおそらく正常に機能します。

于 2011-03-16T14:35:30.827 に答える
1

から期待されるアイテムの数がわかっているのでoutQ、別の回避策は、その数のアイテムを明示的に待つことです。

import multiprocessing as mp
import numpy as np
import Queue

N=100

def worker(inQ, outQ):
    while True:
        i,item=inQ.get()
        result = np.random.rand(1,N)
        outQ.put((i,result))
        inQ.task_done()

def main():
    nProcesses = 8
    data = range(N)
    inQ = mp.JoinableQueue()
    outQ = mp.Queue()    

    for i,item in enumerate(data):
        inQ.put((i,item))

    for i in xrange(nProcesses):
        proc=mp.Process(target=worker, args=[inQ, outQ])
        proc.daemon=True
        proc.start()

    inQ.join()
    cnt=0
    for _ in range(N):
        result=outQ.get()
        print(result)
        cnt+=1
        print(cnt)      
    print('got {c} items'.format(c=cnt))

if __name__ == '__main__':
    main()
于 2011-03-16T21:02:51.577 に答える