私のアプリケーションでは、マルチプロセッシング モジュールのパイプを使用して Python プロセス間で通信しています。最近、送信しているデータのサイズに応じて、奇妙な動作を観察しました。Python のドキュメントによると、これらのパイプは接続に基づいており、非同期で動作するはずですが、送信時にスタックすることがあります。各接続で全二重を有効にすると、送信と受信の両方に接続を使用していなくても、すべて正常に動作します。誰でもこの動作を説明できますか?
- 浮動小数点数 100、全二重無効
非同期性を利用してコードが動作します。 - 100 個の浮動小数点数、全二重が有効
例は期待どおりにうまく機能します。 - 10000 フロート、全二重無効
小さいデータで問題なかったとしても、実行は永久にブロックされます。 -
10000 フロート、全二重で再度 Fine を有効にします。
コード (これは私の製品コードではありません。意味を示しているだけです):
from collections import deque
from multiprocessing import Process, Pipe
from numpy.random import randn
from os import getpid
PROC_NR = 4
DATA_POINTS = 100
# DATA_POINTS = 10000
def arg_passer(pipe_in, pipe_out, list_):
my_pid = getpid()
print "{}: Before send".format(my_pid)
pipe_out.send(list_)
print "{}: After send, before recv".format(my_pid)
buf = pipe_in.recv()
print "{}: After recv".format(my_pid)
if __name__ == "__main__":
pipes = [Pipe(False) for _ in range(PROC_NR)]
# pipes = [Pipe(True) for _ in range(PROC_NR)]
pipes_in = deque(p[0] for p in pipes)
pipes_out = deque(p[1] for p in pipes)
pipes_in.rotate(1)
pipes_out.rotate(-1)
data = [randn(DATA_POINTS) for foo in xrange(PROC_NR)]
processes = [Process(target=arg_passer, args=(pipes_in[foo], pipes_out[foo], data[foo]))
for foo in xrange(PROC_NR)]
for proc in processes:
proc.start()
for proc in processes:
proc.join()