13

私のコード (遺伝的最適化アルゴリズムの一部) は、いくつかのプロセスを並行して実行し、それらすべてが終了するのを待ち、出力を読み取り、別の入力で繰り返します。60回繰り返してテストしたところ、すべて正常に機能していました. うまくいったので、より現実的な繰り返し回数 200 を使用することにしました。次のエラーが表示されました。

File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
 self.run()
File "/usr/lib/python2.7/threading.py", line 504, in run
 self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 302, in _handle_workers
 pool._maintain_pool()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 206, in _maintain_pool
 self._repopulate_pool()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 199, in _repopulate_pool
 w.start()
File "/usr/lib/python2.7/multiprocessing/process.py", line 130, in start
 self._popen = Popen(self)
File "/usr/lib/python2.7/multiprocessing/forking.py", line 120, in __init__
 self.pid = os.fork()
OSError: [Errno 12] Cannot allocate memory

プールを使用する私のコードのスニペットは次のとおりです。

def RunMany(inputs):
from multiprocessing import cpu_count, Pool
proc=inputs[0]
pool=Pool(processes = proc) 
results=[]
for arg1 in inputs[1]:
    for arg2 in inputs[2]:
        for arg3 in inputs[3]:
            results.append(pool.apply_async(RunOne, args=(arg1, arg2, arg3)))
casenum=0
datadict=dict()
for p in results:
    #get results of simulation once it has finished
    datadict[casenum]=p.get() 
    casenum+=1
return datadict

RunOne 関数は、私が作成したクラスにオブジェクトを作成し、計算負荷の高い Python パッケージを使用して、約 30 秒かかる化学の問題を解決し、化学ソルバーの出力でオブジェクトを返します。

したがって、私のコードは RunMany をシリアルで呼び出し、RunMany は RunOne をパラレルで呼び出します。私のテストでは、10 個のプロセッサ (コンピューターには 16 個あります) と RunOne への 20 回の呼び出しのプールを使用して RunOne を呼び出しました。つまり、len(arg1)*len(arg2)*len(arg3)=20 です。コードが RunMany を 60 回呼び出したときはすべて正常に機能しましたが、200 回呼び出したときにメモリが不足しました。

これは、一部のプロセスが正しくクリーンアップされていないことを意味しますか? メモリリークはありますか?メモリ リークがあるかどうかを判断するにはどうすればよいですか? また、リークの原因を特定するにはどうすればよいですか? 200 回の繰り返しループで増加している唯一の項目は、0 サイズから 200 の長さまで増加する数値のリストです。作成したカスタム クラスのオブジェクトの辞書がありますが、長さが制限されています。 50 エントリ - ループが実行されるたびに、ディクショナリから項目が削除され、別の項目に置き換えられます。

編集: RunMany を呼び出すコードのスニペットを次に示します。

for run in range(nruns):
    #create inputs object for RunMany using genetic methods. 
    #Either use starting "population" or create "child" inputs from successful previous runs
    datadict = RunMany(inputs)

    sumsquare=0
    for i in range(len(datadictsenk)): #input condition
        sumsquare+=Compare(datadict[i],Target[i]) #compare result to target

    with open(os.path.join(mainpath,'Outputs','output.txt'),'a') as f:
        f.write('\t'.join([str(x) for x in [inputs.name, sumsquare]])+'\n')

    Objective.append(sumsquare) #add sum of squares to list, to be plotted outside of loop
    population[inputs]=sumsquare #add/update the model in the "population", using the inputs object as a key, and it's objective function as the value
    if len(population)>initialpopulation:
        population = PopulationReduction(population) #reduce the "population" by "killing" unfit "genes"
    avgtime=(datetime.datetime.now()-starttime2)//(run+1)
    remaining=(nruns-run-1)*avgtime
    print(' Finished '+str(run+1)+' / ' +str(nruns)+'. Elapsed: '+str(datetime.datetime.now().replace(microsecond=0)-starttime)+' Remaining: '+str(remaining)+' Finish at '+str((datetime.datetime.now()+remaining).replace(microsecond=0))+'~~~', end="\r")
4

1 に答える 1

17

私の質問へのコメントに示されているように、答えは Puciek から来ました。

解決策は、終了後にプロセスのプールを閉じることでした。results変数が にローカルなので自動的に閉じられRunMany、完了後に削除されると思っていRunManyました。ただし、python は常に期待どおりに動作するとは限りません。

固定コードは次のとおりです。

def RunMany(inputs):
from multiprocessing import cpu_count, Pool
proc=inputs[0]
pool=Pool(processes = proc) 
results=[]
for arg1 in inputs[1]:
    for arg2 in inputs[2]:
        for arg3 in inputs[3]:
            results.append(pool.apply_async(RunOne, args=(arg1, arg2, arg3)))
#new section
pool.close()
pool.join()    
#end new section
casenum=0
datadict=dict()
for p in results:
    #get results of simulation once it has finished
    datadict[casenum]=p.get() 
    casenum+=1
return datadict
于 2014-11-03T22:25:44.673 に答える