0

以下のコードで非常に奇妙な問題が発生しました。numrows = 10プロセスループがそれ自体を完了し、終了に進むとき。増え続けるリストが大きくなると、デッドロックに陥ります。これはなぜですか、どうすればこれを解決できますか?

import multiprocessing, time, sys

# ----------------- Calculation Engine -------------------
def feed(queue, parlist):
    for par in parlist:
        queue.put(par)

def calc(queueIn, queueOut):
    while True:
        try:
            par = queueIn.get(block = False)
            print "Project ID: %s started. " % par
            res = doCalculation(par)
            queueOut.put(res)

        except:
            break

def write(queue, fname):
    print 'Started to write to file'
    fhandle = open(fname, "w")
    while True:
        try:
            res = queue.get(block = False)
            for m in res:
                print >>fhandle, m
        except:
            break
    fhandle.close()
    print 'Complete writing to the file'


def doCalculation(project_ID):
    numrows = 100
    toFileRowList = []

    for i in range(numrows):
        toFileRowList.append([project_ID]*100)
        print "%s %s" % (multiprocessing.current_process().name, i)

    return toFileRowList


def main():
    parlist     = [276, 266]

    nthreads    = multiprocessing.cpu_count()
    workerQueue = multiprocessing.Queue()
    writerQueue = multiprocessing.Queue()

    feedProc = multiprocessing.Process(target = feed , args = (workerQueue, parlist))
    calcProc = [multiprocessing.Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nthreads)]
    writProc = multiprocessing.Process(target = write, args = (writerQueue, 'somefile.csv'))

    feedProc.start()
    feedProc.join ()

    for p in calcProc:
        p.start()
    for p in calcProc:
        p.join()

    writProc.start()
    writProc.join()

if __name__=='__main__':
    sys.exit(main())
4

2 に答える 2

1

feedProc と writeProc は、実際にはプログラムの残りの部分と並行して実行されていません。あなたが持っているとき

proc.start()
proc.join ()

プロセスを開始し、join()すぐに終了するのを待ちます。この場合、マルチプロセッシングにメリットはなく、オーバーヘッドのみです。参加する前に、すべてのプロセスを一度に開始するようにしてください。これにより、キューが定期的に空になり、デッドロックが発生しなくなります。

于 2012-06-07T15:15:29.503 に答える
1

問題はキューバッファがいっぱいになることだと思うので、追加のものを入れる前にキューから読み取る必要があります。たとえば、feedスレッドには次のものがあります。

queue.put(par)

読み取らずに多くのものを入れ続けると、バッファが解放されるまでブロックされますが、問題は、スレッドでバッファを解放するだけであり、ブロックスレッドcalcに参加する前に開始されないことです。feed

したがって、feedスレッドが終了するには、バッファを解放する必要がありますが、スレッドが終了する前にバッファは解放されません:)

キューへのアクセスを整理してみてください。

于 2012-06-07T14:57:36.460 に答える