9

これは単純な問題のように思えますが、私はそれを理解することができません。

double for ループで実行し、結果を HDF ファイルに書き込むシミュレーションがあります。このプログラムの簡単なバージョンを以下に示します。

import tables as pt

a = range(10)
b = range(5)

def Simulation():
    hdf = pt.openFile('simulation.h5',mode='w')
    for ii in a:
        print(ii)
        hdf.createGroup('/','A%s'%ii)
        for i in b:
            hdf.createArray('/A%s'%ii,'B%s'%i,[ii,i])
        hdf.close()
    return
Simulation()

このコードはまさに私が望むことを行いますが、プロセスの実行にはかなりの時間がかかる可能性があるため、マルチプロセッシング モジュールを使用して次のコードを使用しようとしました。

import multiprocessing
import tables as pt

a = range(10)
b = range(5)

def Simulation(ii):
    hdf = pt.openFile('simulation.h5',mode='w')
    print(ii)
        hdf.createGroup('/','A%s'%ii)
        for i in b:
            hdf.createArray('/A%s'%ii,'B%s'%i,[ii,i])
        hdf.close()
    return

if __name__ == '__main__':
    jobs = []
    for ii in a:
        p = multiprocessing.Process(target=Simulation, args=(ii,))
        jobs.append(p)       
        p.start()

ただし、これは最後のシミュレーションを HDF ファイルに出力するだけで、何らかの形で他のすべてのグループを上書きします。

4

1 に答える 1

16

書き込み ( ) モードでファイルを開くたびwに、新しいファイルが作成されます。そのため、ファイルが既に存在する場合、その内容は失われます。ファイルに正常に書き込むことができるのは、最後のファイル ハンドルだけです。これを追加モードに変更したとしても、複数のプロセスから同じファイルに書き込もうとしないでください。2 つのプロセスが同時に書き込もうとすると、出力が文字化けします。

代わりに、すべてのワーカー プロセスが出力をキューに入れ、単一の専用プロセス(サブプロセスまたはメイン プロセス) がキューからの出力を処理し、ファイルに書き込むようにします。


import multiprocessing as mp
import tables as pt


num_arrays = 100
num_processes = mp.cpu_count()
num_simulations = 1000
sentinel = None


def Simulation(inqueue, output):
    for ii in iter(inqueue.get, sentinel):
        output.put(('createGroup', ('/', 'A%s' % ii)))
        for i in range(num_arrays):
            output.put(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i])))


def handle_output(output):
    hdf = pt.openFile('simulation.h5', mode='w')
    while True:
        args = output.get()
        if args:
            method, args = args
            getattr(hdf, method)(*args)
        else:
            break
    hdf.close()

if __name__ == '__main__':
    output = mp.Queue()
    inqueue = mp.Queue()
    jobs = []
    proc = mp.Process(target=handle_output, args=(output, ))
    proc.start()
    for i in range(num_processes):
        p = mp.Process(target=Simulation, args=(inqueue, output))
        jobs.append(p)
        p.start()
    for i in range(num_simulations):
        inqueue.put(i)
    for i in range(num_processes):
        # Send the sentinal to tell Simulation to end
        inqueue.put(sentinel)
    for p in jobs:
        p.join()
    output.put(None)
    proc.join()

比較のために、使用するバージョンを次に示しますmp.Pool

import multiprocessing as mp
import tables as pt


num_arrays = 100
num_processes = mp.cpu_count()
num_simulations = 1000


def Simulation(ii):
    result = []
    result.append(('createGroup', ('/', 'A%s' % ii)))
    for i in range(num_arrays):
        result.append(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i])))
    return result


def handle_output(result):
    hdf = pt.openFile('simulation.h5', mode='a')
    for args in result:
        method, args = args
        getattr(hdf, method)(*args)
    hdf.close()


if __name__ == '__main__':
    # clear the file
    hdf = pt.openFile('simulation.h5', mode='w')
    hdf.close()
    pool = mp.Pool(num_processes)
    for i in range(num_simulations):
        pool.apply_async(Simulation, (i, ), callback=handle_output)
    pool.close()
    pool.join()

もっとシンプルに見えませんか?ただし、1 つの大きな違いがあります。output.put引数を送信するために使用された元のコードhandle_outputは、独自のサブプロセスで実行されていました。キューから取得し、handle_outputすぐに処理します。上記の Pool コードでは、大量のinが蓄積され、返されるまで送信されません。argsoutputSimulationargsresultresulthandle_outputSimulation

時間がかかる場合Simulation、何も書き込まれていない間、長い待ち時間が発生しsimulation.h5ます。

于 2013-03-29T13:33:36.050 に答える