5

最近、を使用して親子接続オブジェクトのペアを作成し、パイプを介して送信しようとしてmultiprocessing.Pipeいるオブジェクトobjが大きすぎる場合、例外をスローしたり、何もしなかったりせずにプログラムがハングすることがわかりました。 。以下のコードを参照してください。(以下のコードは、numpyパッケージを使用してフロートの大規模な配列を生成します。)

import multiprocessing as mp
import numpy as np

def big_array(conn, size=1200):
    a = np.random.rand(size)
    print "Child process trying to send array of %d floats." %size
    conn.send(a)
    return a

if __name__ == "__main__":
    print "Main process started."
    parent_conn, child_conn = mp.Pipe()
    proc = mp.Process(target=big_array, args=[child_conn, 1200])
    proc.start()
    print "Child process started."
    proc.join()
    print "Child process joined."
    a = parent_conn.recv()
    print "Received the following object."
    print "Type: %s. Size: %d." %(type(a), len(a))

出力は次のとおりです。

Main process started.
Child process started.
Child process trying to send array of 1200 floats.

そして、それはここに無期限にぶら下がっています。ただし、1200ではなく1000のfloatを含む配列を送信しようとすると、プログラムは正常に実行され、期待どおりに次の出力が得られます。

Main process started.
Child process started.
Child process trying to send array of 1000 floats.
Child process joined.
Received the following object.
Type: <type 'numpy.ndarray'>. Size: 1000.
Press any key to continue . . .

これは私にはバグのように見えます。ドキュメントには次のように書かれています。

send(obj)接続のもう一方の端にオブジェクトを送信します。このオブジェクトはrecv()を使用して読み取る必要があります。

オブジェクトは選択可能でなければなりません。非常に大きなピクルス(OSによって異なりますが、約32 MB以上)は、ValueError例外を発生させる可能性があります。

しかし、私の実行では、ValueError例外もスローされず、プログラムはそこでハングします。さらに、1200の長さのnumpy配列は9600バイトの大きさで、確かに32MB以下です。これはバグのようです。誰かがこの問題を解決する方法を知っていますか?

ちなみに、私はWindows 7、64ビットを使用しています。

4

1 に答える 1

16

join()下に移動してみてくださいrecv()

import multiprocessing as mp

def big_array(conn, size=1200):
    a = "a" * size
    print "Child process trying to send array of %d floats." %size
    conn.send(a)
    return a

if __name__ == "__main__":
    print "Main process started."
    parent_conn, child_conn = mp.Pipe()
    proc = mp.Process(target=big_array, args=[child_conn, 120000])
    proc.start()
    print "Child process started."
    print "Child process joined."
    a = parent_conn.recv()
    proc.join()
    print "Received the following object."
    print "Type: %s. Size: %d." %(type(a), len(a))

しかし、あなたの例が小さなサイズでも機能する理由はよくわかりません。パイプに書き込んでから、最初にパイプからデータを読み取らずに結合するプロセスを作成すると、結合がブロックされると考えていました。最初にパイプから受け取り、次に参加する必要があります。しかし、どうやらそれは小さなサイズではブロックされません...?

編集:ドキュメント(http://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming)から:

「デッドロックが発生する例は次のとおりです。」

from multiprocessing import Process, Queue

def f(q):
q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()
于 2013-03-30T08:17:08.477 に答える