マルチプロセッシングを行うために次のパターンを使用しています。
for item in data:
inQ.put(item)
for i in xrange(nProcesses):
inQ.put('STOP')
multiprocessing.Process(target=worker, args=(inQ, outQ)).start()
inQ.join()
outQ.put('STOP')
for result in iter(outQ.get, 'STOP'):
# save result
これはうまくいきます。しかし、 を介して numpy 配列を送信するとoutQ
、'STOP'
が の最後に終わらずoutQ
、結果取得ループが早期に終了します。
これは、動作を再現するためのコードです。
import multiprocessing
import numpy as np
def worker(inQ, outQ):
for i in iter(inQ.get, 'STOP'):
result = np.random.rand(1,100)
outQ.put(result)
inQ.task_done()
inQ.task_done() # for the 'STOP'
def main():
nProcesses = 8
data = range(1000)
inQ = multiprocessing.JoinableQueue()
outQ = multiprocessing.Queue()
for item in data:
inQ.put(item)
for i in xrange(nProcesses):
inQ.put('STOP')
multiprocessing.Process(target=worker, args=(inQ, outQ)).start()
inQ.join()
print outQ.qsize()
outQ.put('STOP')
cnt = 0
for result in iter(outQ.get, 'STOP'):
cnt += 1
print "got %d items" % cnt
print outQ.qsize()
if __name__ == '__main__':
main()
result = np.random.rand(1,100)
を次のようなものに置き換えるとresult = i*i
、コードは期待どおりに機能します。
ここで何が起きてるの?ここで根本的に間違ったことをしていますか?すべてのプロセスまでのブロックがすべてoutQ.put()
のs.inQ.join()
join()
put()
私のために働く回避策は、結果をフェッチするループをwhile outQ.qsize() > 0
で実行することです。しかし、私が読んだことqsize()
は信頼できません。信頼できないのは、さまざまなプロセスが実行されている間だけですか? qsize()
を行った後、私が頼りになるのは安全inQ.join()
ですか?
を使用することを提案する人もいるとmultiprocessing.Pool.map()
思いますが、numpy 配列 (ndarrays) でそれを行うと、pickle エラーが発生します。
ご覧いただきありがとうございます。