書き込み ( ) モードでファイルを開くたびw
に、新しいファイルが作成されます。そのため、ファイルが既に存在する場合、その内容は失われます。ファイルに正常に書き込むことができるのは、最後のファイル ハンドルだけです。これを追加モードに変更したとしても、複数のプロセスから同じファイルに書き込もうとしないでください。2 つのプロセスが同時に書き込もうとすると、出力が文字化けします。
代わりに、すべてのワーカー プロセスが出力をキューに入れ、単一の専用プロセス(サブプロセスまたはメイン プロセス) がキューからの出力を処理し、ファイルに書き込むようにします。
import multiprocessing as mp
import tables as pt
num_arrays = 100
num_processes = mp.cpu_count()
num_simulations = 1000
sentinel = None
def Simulation(inqueue, output):
for ii in iter(inqueue.get, sentinel):
output.put(('createGroup', ('/', 'A%s' % ii)))
for i in range(num_arrays):
output.put(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i])))
def handle_output(output):
hdf = pt.openFile('simulation.h5', mode='w')
while True:
args = output.get()
if args:
method, args = args
getattr(hdf, method)(*args)
else:
break
hdf.close()
if __name__ == '__main__':
output = mp.Queue()
inqueue = mp.Queue()
jobs = []
proc = mp.Process(target=handle_output, args=(output, ))
proc.start()
for i in range(num_processes):
p = mp.Process(target=Simulation, args=(inqueue, output))
jobs.append(p)
p.start()
for i in range(num_simulations):
inqueue.put(i)
for i in range(num_processes):
# Send the sentinal to tell Simulation to end
inqueue.put(sentinel)
for p in jobs:
p.join()
output.put(None)
proc.join()
比較のために、使用するバージョンを次に示しますmp.Pool
。
import multiprocessing as mp
import tables as pt
num_arrays = 100
num_processes = mp.cpu_count()
num_simulations = 1000
def Simulation(ii):
result = []
result.append(('createGroup', ('/', 'A%s' % ii)))
for i in range(num_arrays):
result.append(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i])))
return result
def handle_output(result):
hdf = pt.openFile('simulation.h5', mode='a')
for args in result:
method, args = args
getattr(hdf, method)(*args)
hdf.close()
if __name__ == '__main__':
# clear the file
hdf = pt.openFile('simulation.h5', mode='w')
hdf.close()
pool = mp.Pool(num_processes)
for i in range(num_simulations):
pool.apply_async(Simulation, (i, ), callback=handle_output)
pool.close()
pool.join()
もっとシンプルに見えませんか?ただし、1 つの大きな違いがあります。output.put
引数を送信するために使用された元のコードhandle_output
は、独自のサブプロセスで実行されていました。キューから取得し、handle_output
すぐに処理します。上記の Pool コードでは、大量のinが蓄積され、返されるまで送信されません。args
output
Simulation
args
result
result
handle_output
Simulation
時間がかかる場合Simulation
、何も書き込まれていない間、長い待ち時間が発生しsimulation.h5
ます。