3

1つのパイプから3つの異なるプロセスにデータをフィードするにはどうすればよいですか?

nulfp = open(os.devnull, "w")

piper = Popen([
    "come command",
    "some params"
], stdout = PIPE, stderr = nulfp.fileno())

pipe_consumer_1 = Popen([
    "come command",
    "some params"
], stdin = piper.stdout, stderr = nulfp.fileno())

pipe_consumer_2 = Popen([
    "come command",
    "some params"
], stdin = piper.stdout, stderr = nulfp.fileno())

pipe_consumer_3 = Popen([
    "come command",
    "some params"
], stdin = piper.stdout, stderr = nulfp.fileno())

pipe_consumer_1.communicate()
pipe_consumer_2.communicate()
pipe_consumer_3.communicate()
piper.communicate()

上記のコードを実行すると、破損したファイルが生成されます。パイプの消費者がおそらくパイパーからの完全な出力を読み取っていないことを意味します。

これは適切に動作しますが、はるかに低速です。

nulfp = open(os.devnull, "w")

piper_1 = Popen([
    "come command",
    "some params"
], stdout = PIPE, stderr = nulfp.fileno())

piper_2 = Popen([
    "come command",
    "some params"
], stdout = PIPE, stderr = nulfp.fileno())

piper_3 = Popen([
    "come command",
    "some params"
], stdout = PIPE, stderr = nulfp.fileno())

pipe_consumer_1 = Popen([
    "come command",
    "some params"
], stdin = piper_1.stdout, stderr = nulfp.fileno())

pipe_consumer_2 = Popen([
    "come command",
    "some params"
], stdin = piper_2.stdout, stderr = nulfp.fileno())

pipe_consumer_3 = Popen([
    "come command",
    "some params"
], stdin = piper_3.stdout, stderr = nulfp.fileno())

pipe_consumer_1.communicate()
pipe_consumer_2.communicate()
pipe_consumer_3.communicate()
piper_1.communicate()
piper_2.communicate()
piper_3.communicate()

最初のコードスニペットを2番目のコードスニペットと同じように機能させる方法についての提案はありますか?私が最初のアプローチを機能させると、プロセスは3分の1の時間で終了します。

4

2 に答える 2

2

これは単一バイトの「ブロック」のみを使用しますが、アイデアは得られます。

from subprocess import Popen, PIPE

cat_proc = '/usr/bin/cat'

consumers = (Popen([cat_proc], stdin = PIPE, stdout = open('consumer1', 'w')),
             Popen([cat_proc], stdin = PIPE, stdout = open('consumer2', 'w')),
             Popen([cat_proc], stdin = PIPE, stdout = open('consumer3', 'w'))
)


with open('inputfile', 'r') as infile:
   for byte in infile:
       for consumer in consumers:
           consumer.stdin.write(byte)

テスト時、コンシューマ出力ファイルは入力ファイルと一致します。

編集:これは、1Kブロックのプロセスからの読み取りです。

from subprocess import Popen, PIPE

cat_proc = '/usr/bin/cat'

consumers = (Popen([cat_proc], stdin = PIPE, stdout = open('consumer1', 'w')),
             Popen([cat_proc], stdin = PIPE, stdout = open('consumer2', 'w')),
             Popen([cat_proc], stdin = PIPE, stdout = open('consumer3', 'w'))
)

producer = Popen([cat_proc, 'inputfile'], stdout = PIPE)

while True:
    byte = producer.stdout.read(1024)
    if not byte: break
    for consumer in consumers:
        consumer.stdin.write(byte)
于 2012-07-19T13:28:24.610 に答える
1

パイプからのデータは1回しか読み取ることができず、読み取られるとバッファから削除されます。これは、コンシューマープロセスがすべてデータのランダムな部分のみを表示することを意味します。これを組み合わせると、完全なストリームが得られます。もちろん、これはあまり役に立ちません。

プロデューサープロセスに書き込みをsubprocess.PIPE行い、このパイプからチャンクでバッファーに読み取り、このバッファーをすべてのコンシューマープロセスに書き込むことができます。これは、すべてのバッファ処理を自分で行う必要があることを意味します。この作業を行う方がおそらく簡単teeです。サンプルコードをすぐに投稿します。

于 2012-07-19T13:46:33.010 に答える