10

私のアプリケーションでは、マルチプロセッシング モジュールのパイプを使用して Python プロセス間で通信しています。最近、送信しているデータのサイズに応じて、奇妙な動作を観察しました。Python のドキュメントによると、これらのパイプは接続に基づいており、非同期で動作するはずですが、送信時にスタックすることがあります。各接続で全二重を有効にすると、送信と受信の両方に接続を使用していなくても、すべて正常に動作します。誰でもこの動作を説明できますか?

  1. 浮動小数点数 100、全二重無効
    非同期性を利用してコードが動作します。
  2. 100 個の浮動小数点数、全二重が有効
    例は期待どおりにうまく機能します。
  3. 10000 フロート、全二重無効
    小さいデータで問題なかったとしても、実行は永久にブロックされます。

  4. 10000 フロート、全二重で再度 Fine を有効にします。

コード (これは私の製品コードではありません。意味を示しているだけです):

from collections import deque
from multiprocessing import Process, Pipe
from numpy.random import randn
from os import getpid

PROC_NR = 4
DATA_POINTS = 100
# DATA_POINTS = 10000


def arg_passer(pipe_in, pipe_out, list_):
    my_pid = getpid()
    print "{}: Before send".format(my_pid)
    pipe_out.send(list_)
    print "{}: After send, before recv".format(my_pid)
    buf = pipe_in.recv()
    print "{}: After recv".format(my_pid)


if __name__ == "__main__":
    pipes = [Pipe(False) for _ in range(PROC_NR)]
    # pipes = [Pipe(True) for _ in range(PROC_NR)]
    pipes_in = deque(p[0] for p in pipes)
    pipes_out = deque(p[1] for p in pipes)
    pipes_in.rotate(1)
    pipes_out.rotate(-1)

    data = [randn(DATA_POINTS) for foo in xrange(PROC_NR)]
    processes = [Process(target=arg_passer, args=(pipes_in[foo], pipes_out[foo], data[foo]))
                 for foo in xrange(PROC_NR)]

    for proc in processes:
        proc.start()

    for proc in processes:
        proc.join()
4

1 に答える 1

14

まず、multiprocessing.Pipeクラスの実装に注目する価値があります...

def Pipe(duplex=True):
    '''
    Returns pair of connection objects at either end of a pipe
    '''
    if duplex:
        s1, s2 = socket.socketpair()
        s1.setblocking(True)
        s2.setblocking(True)
        c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
        c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
        s1.close()
        s2.close()
    else:
        fd1, fd2 = os.pipe()
        c1 = _multiprocessing.Connection(fd1, writable=False)
        c2 = _multiprocessing.Connection(fd2, readable=False)

    return c1, c2

違いは、半二重の「パイプ」は匿名パイプを使用しますが、全二重の「パイプ」は実際にはUnix ドメイン ソケットを使用します。これは、匿名パイプが本来単方向であるためです。

このコンテキストでの「非同期」という用語の意味がわかりません。「非ブロッキング I/O」を意味する場合は、両方の実装がデフォルトでブロッキング I/O を使用することに注意してください。


第二に、送信しようとしているデータのピクルサイズに注意する価値があります...

>>> from numpy.random import randn
>>> from cPickle import dumps
>>> len(dumps(randn(100)))
2479
>>> len(dumps(randn(10000)))
237154

第三に、pipe(7)マンページから...

パイプ容量

パイプには限られた容量があります。パイプがいっぱいの場合、O_NONBLOCK フラグが設定されているかどうかに応じて、write(2) がブロックされるか失敗します (以下を参照)。実装が異なれば、パイプ容量の制限も異なります。アプリケーションは特定の容量に依存するべきではありません。アプリケーションは、書き込みプロセスがブロックされたままにならないように、データが利用可能になるとすぐに読み取りプロセスがデータを消費するように設計する必要があります。

2.6.11 より前の Linux バージョンでは、パイプの容量はシステム ページ サイズと同じでした (たとえば、i386 では 4096 バイト)。Linux 2.6.11 以降、パイプ容量は 65536 バイトです。


pipe_out.send()したがって、事実上、すべてのサブプロセスが呼び出しでブロックされ、他のプロセスからデータを受信できないデッドロックが発生しました。 65,536 バイトのバッファがいっぱいになりました。

Unix ドメイン ソケット バージョンを使用したくなるかもしれませんが、現時点で動作する唯一の理由は、パイプよりも大きいバッファー サイズを持っていることですDATA_POINTS。 100,000。

「クイック アンド ダーティー ハック」ソリューションは、送信のためにデータを小さなチャンクに分割することですが、特定のサイズのバッファーに依存することはお勧めできません。

より良い解決策は、呼び出しで非ブロッキング I/O を使用することですが、そのモジュールを使用してそれを実現する最善の方法を判断するには、モジュールにpipe_out.send()精通していません。multiprocessing

擬似コードは次のようになります...

while 1:
    if we have sent all data and received all data:
        break
    send as much data as we can without blocking
    receive as much data as we can without blocking
    if we didn't send or receive anything in this iteration:
        sleep for a bit so we don't waste CPU time
        continue

...または、Python モジュールを使用して、select必要以上に長くスリープしないようにすることもできますが、これを統合するのはmultiprocessing.Pipe難しい場合があります。

クラスがこれらすべてを実行する可能性はありmultiprocessing.Queueますが、私はこれまで使用したことがないため、いくつかの実験を行う必要があります。

于 2013-04-24T12:59:22.567 に答える