私はPythonでサブプロセスパイプラインをテストしています。以下のプログラムがPythonで直接実行できることは承知していますが、それは重要ではありません。パイプラインをテストしたいので、使用方法を知っています。
私のシステムはLinuxUbuntu9.04で、デフォルトはpython2.6です。
私はこのドキュメントの例から始めました。
from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]
print output
それは機能しますが、p1
'sstdin
はリダイレクトされていないため、パイプにフィードするためにターミナルに入力する必要があります。^D
終了stdinと入力すると、必要な出力が得られます。
ただし、Python文字列変数を使用してパイプにデータを送信したいと思います。最初に私はstdinに書いてみました:
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.stdin.write('test\n')
output = p2.communicate()[0] # blocks forever here
動作しませんでした。p2.stdout.read()
最後の行で代わりに使用してみましたが、ブロックされます。追加p1.stdin.flush()
しましp1.stdin.close()
たが、どちらも機能しませんでした。それから私はコミュニケーションに移りました:
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.communicate('test\n') # blocks forever here
output = p2.communicate()[0]
だから、それはまだそれではありません。
単一のプロセスの実行(p1
上記のように、削除p2
)が完全に機能することに気づきました。p1
また、ファイルハンドルを( )に渡すこともできますstdin=open(...)
。したがって、問題は次のとおりです。
ブロッキングせずに、Pythonで2つ以上のサブプロセスのパイプラインにデータを渡すことは可能ですか?なぜだめですか?
シェルを実行してパイプラインをシェルで実行できることは承知していますが、それは私が望んでいることではありません。
更新1:以下のAaron Digullaのヒントに従って、スレッドを使用して機能させようとしています。
まず、スレッドでp1.communicateを実行してみました。
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=p1.communicate, args=('some data\n',))
t.start()
output = p2.communicate()[0] # blocks forever here
さて、動作しませんでした。に変更するなど、他の組み合わせも試してみまし.write()
たp2.read()
。何もない。次に、反対のアプローチを試してみましょう。
def get_output(subp):
output = subp.communicate()[0] # blocks on thread
print 'GOT:', output
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,))
t.start()
p1.communicate('data\n') # blocks here.
t.join()
コードはどこかでブロックされてしまいます。生成されたスレッド、メインスレッド、またはその両方。だからそれはうまくいきませんでした。それを機能させる方法を知っているなら、機能するコードを提供できればもっと簡単になるでしょう。私はここで試しています。
更新2
Paul Du Boisがいくつかの情報で以下に答えたので、私はさらにテストを行いました。モジュール全体を読み、subprocess.py
それがどのように機能するかを理解しました。だから私はそれをコードに正確に適用してみました。
私はLinuxを使用していますが、スレッドを使用してテストしていたため、最初のアプローチは、subprocess.py
のcommunicate()
メソッドで見られる正確なWindowsスレッドコードを複製することでしたが、1つではなく2つのプロセスでした。これが私が試したものの完全なリストです:
import os
from subprocess import Popen, PIPE
import threading
def get_output(fobj, buffer):
while True:
chunk = fobj.read() # BLOCKS HERE
if not chunk:
break
buffer.append(chunk)
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
b = [] # create a buffer
t = threading.Thread(target=get_output, args=(p2.stdout, b))
t.start() # start reading thread
for x in xrange(100000):
p1.stdin.write('hello world\n') # write data
p1.stdin.flush()
p1.stdin.close() # close input...
t.join()
上手。うまくいきませんでした。p1.stdin.close()
呼び出された後も、p2.stdout.read()
まだブロックします。
次に、posixコードを試してみましたsubprocess.py
:
import os
from subprocess import Popen, PIPE
import select
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
numwrites = 100000
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
while to_read or to_write:
read_now, write_now, xlist = select.select(to_read, to_write, [])
if read_now:
data = os.read(p2.stdout.fileno(), 1024)
if not data:
p2.stdout.close()
to_read = []
else:
b.append(data)
if write_now:
if numwrites > 0:
numwrites -= 1
p1.stdin.write('hello world!\n'); p1.stdin.flush()
else:
p1.stdin.close()
to_write = []
print b
また、をブロックしselect.select()
ます。sを広めることによってprint
、私はこれを見つけました:
- 読書は機能しています。コードは実行中に何度も読み取ります。
- ライティングも機能しています。データはに書き込まれ
p1.stdin
ます。 - の終わりに
numwrites
、p1.stdin.close()
が呼び出されます。 select()
ブロックを開始すると、何かしかありto_read
ませんp2.stdout
。to_write
すでに空です。os.read()
callは常に何かを返すため、p2.stdout.close()
呼び出されることはありません。
両方のテストからの結論stdin
:(例では)パイプラインの最初のプロセスを閉じることはgrep
、バッファリングされた出力を次のプロセスにダンプして死ぬことにはなりません。
それを機能させる方法はありませんか?
PS:一時ファイルは使いたくありません。すでにファイルでテストしていて、動作することはわかっています。そして、私はウィンドウを使いたくありません。