8

Webカメラとマイクの2つのセンサーから情報を収集するアプリケーションをデバッグしています。

一般的なアーキテクチャは非常に単純です:

  • メインプロセスは、パイプを介して子プロセス(それぞれに1つ)にメッセージ(start、stop、get_data)を送信します。
  • 子プロセスはデータを収集してメインプロセスに送信します

子プロセスとメインプロセスは、コマンドを処理するために無限ループになっています(ユーザーからのメインプロセス、メインプロセスからの子プロセス)。

グローバルに機能しますが、子プロセスを停止するのに問題があります。

私はコードを記録しました、そしてそれは2つのことを起こすようです:

  1. 「停止」メッセージは送信されますが、パイプを通過しません。
  2. 子プロセスは引き続きデータとconn.send(data)ブロックを送信します。

何も返さない子プロセスにはこの動作がないため、この動作は接続の状態に明確にリンクされています。それでも、合理的と思われる現在のアーキテクチャをデバッグ/変更する方法がわかりません。

では、このブロッキング動作の原因とそれを回避する方法は何ですか?

これは、子プロセスの無限ループの反復ごとに実行されるコードです。

    def do(self):
        while self.cnx.poll():
            msg = self.cnx.recv()
            self.queue.append(msg)
        #==
        if not self.queue:
            func_name = 'default_action'
            self.queue.append([func_name, ])
        #==
        msg             = self.queue.pop()
        func_name, args = msg[0], msg[1:]
        #==
        res = self.target.__getattribute__(func_name)(*args)
        #==
        running = func_name != 'stop'
        #==
        if res and self.send:
            assert running
            self.output_queue.append(res[0])
        if self.output_queue and running:
            self.cnx.send(self.output_queue.popleft())
        #==
        return running

更新:パイプを両端で同時に書き込むことはできないようです。上記のコードの最後の数行を次のように変更すると機能します。

        if self.output_queue and running:
            if not self.cnx.poll(): 
                self.cnx.send(self.output_queue.popleft())

パイプはデフォルトで全二重として文書化されており、この動作はまったく文書化されていないため、質問は未解決のままです。私は何かを誤解したに違いありません。教えてください!

アップデート2:明確にするために、この状況では接続は閉じられません。イベントのシーケンスを説明するには:

  • メインプロセスはメッセージを送信します(「停止」)(メッセージを送信する前に接続を空にします)
  • メインプロセスは、子プロセスが終了すると停止する(無限)ループに入ります。
  • その間、子プロセスは送信でブロックされ、メッセージを取得することはありません。
4

1 に答える 1

4

全二重multiprocessing.Pipeは として実装されsocketpair()ます。呼び出し.sendは、ソケットと通信するときに、すべての通常の理由でブロックされる可能性があります。あなたの説明に基づいて、あなたの読者がPipe読むのをやめて、データがカーネルのバッファに蓄積されて、あなたの.sendブロックのポイントになった可能性が高いと思います。

.close受信側を明示的に指定SIGPIPEすると、.send. 受信接続が範囲外になった場合、これはおそらく自動的に発生します。受信側への参照 (直接的または間接的) を保存しないように注意するだけで、問題を解決できる場合があります。これにより、そのスレッドがなくなったときに割り当てが解除されます。

ブロッキングの簡単なデモ.send:

import multiprocessing

a, b = multiprocessing.Pipe()

while True:
    print "send!"
    a.send("hello world")

しばらくすると、「send!」の印刷が終了することに注意してください。

于 2012-09-05T01:20:33.897 に答える