7

マルチプロセッシングから単一ファイルへの結果の書き込みに関連するスタック交換に関する多くの投稿があることを知っており、それらの投稿のみを読んだ後にコードを開発しました。私が達成しようとしているのは、「RevMapCoord」関数を並行して実行し、その結果を multiprocess.queue を使用して 1 つのファイルに書き込むことです。しかし、仕事をキューに入れているときに問題が発生しています。私のコード:

def RevMapCoord(list):
    "Read a file, Find String and Do something"

def feed(queue, parlist):
    for par in parlist:
        print ('Echo from Feeder: %s' % (par))
        queue.put(par)
    print ('**Feeder finished queing**')

def calc(queueIn, queueOut):
     print ('Worker function started')
     while True:
         try:
             par = queueIn.get(block = False)
             res = RevMapCoord(final_res)
             queueOut.put((par,res))
         except:
             break

def write(queue, fname):
    fhandle = open(fname, "w")
    while True:
         try:
            par, res = queue.get(block = False)
            print >>fhandle, par, res
         except:
            break
    fhandle.close()


feedProc = Process(target = feed , args = (workerQueue, final_res))
calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nproc)]
writProc = Process(target = write, args = (writerQueue, sco_inp_extend_geno))

feedProc.start()
print ('Feeder is joining')
feedProc.join ()
for p in calcProc:
    p.start()
for p in calcProc:
    p.join()
writProc.start()
writProc.join ()

このコード スクリプトを実行すると、"feedProc.start()" ステップでスタックします。画面からの最後の数行の出力は、「feedProc.start()」の終わりからの print ステートメントを示しています。

Echo from Feeder: >AK779,AT61680,50948-50968,50959,6,0.406808,Ashley,Dayne
Echo from Feeder: >AK832,AT30210,1091-1111,1102,7,0.178616,John,Caine
**Feeder finished queing**

しかし、次の行「feedProc.join ()」を実行する前にハングします。コードはエラーを出さず、実行を続けますが、何もしません (ハングします)。私が犯している間違いを教えてください。

4

2 に答える 2

11

例を基本にスリム化する必要があると思います。例えば:

from multiprocessing import Process, Queue

def f(q):
    q.put('Hello')
    q.put('Bye')
    q.put(None)

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    with open('file.txt', 'w') as fp:
        while True:
            item = q.get()
            print(item)
            if item is None:
                break
            fp.write(item)
    p.join()

ここには 2 つのプロセス (メイン プロセス、ap) があります。p は、メイン プロセスによって取得される文字列をキューに入れます。メインプロセスが None を見つけると (私が示すために使用している歩哨: 「完了しました」と、ループが中断されます。

これを多くのプロセス (またはスレッド) に拡張するのは簡単です。

于 2013-03-20T18:26:04.693 に答える
0

Python3 の「map_async」関数を使用して、マルチプロセッシングの結果を単一のファイルに書き込むことを実現しました。ここに私が書いた関数があります:

def PPResults(module,alist):##Parallel processing
    npool = Pool(int(nproc))    
    res = npool.map_async(module, alist)
    results = (res.get())###results returned in form of a list 
    return results

したがって、この関数には「a_list」にパラメーターのリストを指定し、「module」は処理を実行して結果を返す関数です。上記の関数は、リストの形式で結果を収集し続け、「a_list」からのすべてのパラメーターが処理されると戻ります。結果は正しい順序ではないかもしれませんが、私にとって順序は重要ではなかったので、これはうまくいきました. 「結果」リストは反復でき、個々の結果は次のようにファイルに書き込まれます。

fh_out = open('./TestResults', 'w')
for i in results:##Write Results from list to file
    fh_out.write(i)

結果の順序を維持するには、上記の質問で述べたのと同様の「キュー」を使用する必要がある場合があります。コードを修正することはできますが、ここで言及する必要はないと思います。

ありがとう

AK

于 2013-03-22T22:09:44.120 に答える