4

基本的に私は次のコードを持っています:

import multiprocessing
import time

class MyProcess(multiprocessing.Process):

    def __init__(self, ):
        multiprocessing.Process.__init__(self)
        self.queue = multiprocessing.Queue()

    def run(self):
        print "Subprocess starting!"
        time.sleep(4)
        print "Subprocess exiting!"

    def addToQueue(self):
        starttime = time.time()
        count=0
        print "Adding stuff to queue..."
        while time.time()-starttime  < 4:
            self.queue.put("string")
            count += 1
        print "Added %d objects!" % count

        #self.queue.close()


if __name__ == "__main__":
    process = MyProcess()
    process.start()
    print "Waiting for a while"
    time.sleep(2)
    process.addToQueue()
    time.sleep(1)
    print "Child process state: %d" % process.is_alive()

メインプロセスが終了しても、終了しません。何も起こりません、それはただブロックします。私がやめたのはそれを殺すことだけでした(SIGTERM、SIGKILLではありません)。

そのコメント行を使用すると、終了しますが、IOErrorが発行されます。

multiprocessing.queueのコードを調べたところ、別のスレッド(threading.Thread)で生成されたos.pipe()が使用されています。私が疑うのは、パイプへの書き込み時にスレッドがブロックされ、close()メソッドが使用されると、IOErrorが発生することです。

だから私の質問は:これを処理するためのよりクリーンな方法はありますか?

つまり、キューが常に書き込まれているこのシナリオがあります。受信プロセスが(クリーンかどうかにかかわらず)終了したら、キューを閉じて送信プロセスでIOErrorを取得する必要がありますか?

編集:プロセスの出力

Waiting for a while
Subprocess starting!
Adding stuff to queue...
Subprocess exiting!
Added 1822174 objects!
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 266, in _feed
    send(obj)
IOError: [Errno 32] Broken pipe
Child process state: 0

この部分は、コメント化されたself.queue.close()を使用する場合にのみ発生します。

Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 266, in _feed
    send(obj)
IOError: [Errno 32] Broken pipe
4

1 に答える 1

11

誰もがコメントを読むわけではないので、私は自分の質問に答えています。コメントのユーザー mata からのヒントの後、time.sleep(0.01)キューにオブジェクトを追加するループ内に呼び出しを追加して、質問のサンプル コードをテストしたので、キューに追加されるオブジェクトの数を制限できました。

def addToQueue(self):
        starttime = time.time()
        count=0
        print "Adding stuff to queue..."
        while time.time()-starttime  < 4:
            self.queue.put("string")
            count += 1
            time.sleep(0.01)
        print "Added %d objects!" % count

そのため、オブジェクトの数が少ない場合 (この例では 3800 未満)、プロセスは正常に終了します。しかし、多くのオブジェクトがある場合、プロセス間のパイプで何らかのロックが発生しているようです。

しかし、それは私にとって別の疑問を提起します: これはバグですか? 私はそれを報告する必要がありますか?それとも、これは通常の予想される動作ですか?

この可能性を指摘してくれたユーザー mata に感謝します!

于 2012-05-17T15:14:40.417 に答える