私はたくさん検索しましたが、実行中の python サブプロセスから Tornado に出力を取得する方法が見つかりませんでした。私が欲しいのはTravis CIのようなものです。管理ページでジョブを開始すると、サーバーがリクエストを受け取り、サブプロセスを開始します。このサブプロセスは、いくつかのデータ マイニングを行い、文字列バッファーにいくつかのログをフィードします。settimeout または websocket を使用した ajax でこのログを取得し、このログをページに出力します。ユーザーがページを閉じて後で戻ってきても、ログが残り、通常は更新されます。うーん、トラヴィスにとてもよく似ています。
1 に答える
このブログ投稿は、これを行う方法を示しています: http://stefaanlippens.net/python-asynchronous-subprocess-pipe-reading
基本的に、この投稿は、stdout と stderr の両方を非同期的に読み取ることにより、プロセスの出力を読み取るときにデッドロックを防ぐ方法を示しています。producer
コマンド fromを置き換えて、__main__
好きなコマンドを実行し、print ステートメントをコードに置き換えて、Tornado で出力を処理することができます。
更新: ブログが削除された場合に備えて、次の情報を含めました。
...たとえば、実行時間の長いプロセスを監視するために、標準出力とエラーを 1 行ずつ読みたい場合はどうすればよいでしょうか? Web では、さまざまな程度の複雑さ、抽象化、および依存関係を持つ多くのソリューションを見つけることができます。1 つの解決策 (コードが制限され、標準ライブラリ以外の依存関係がない場合) は、別のスレッドでパイプを読み取ることです。これにより、1 つのパイプが別のパイプをブロックできなくなります。
以下のコードは、実装例を示しています。スクリプトは、親プロセスと子プロセスの両方で使用されるように設定されています。
子プロセスの場合: 'produce' 引数を指定して呼び出されると、標準出力と標準エラーでいくつかの行をランダムにレンダリングする Produce() 関数が実行されます。行間に少しの遅延があり、実行時間の長いプロセスをシミュレートします。consumer() 関数に実装されている親プロセス (引数なしで呼び出されるスクリプト) は、サブプロセスと同じスクリプトを「子モード」で呼び出し、各行がどのパイプから来るかを事前に知らなくても、その出力を行ごとに監視します。
AsynchronousFileReader クラスは、標準出力パイプとエラー パイプを非同期に読み取り、各行をキューに入れるスレッド用です。次に、メインスレッドは、行がキューに入るのを監視することにより、サブプロセスを監視できます。
import sys
import subprocess
import random
import time
import threading
import Queue
class AsynchronousFileReader(threading.Thread):
'''
Helper class to implement asynchronous reading of a file
in a separate thread. Pushes read lines on a queue to
be consumed in another thread.
'''
def __init__(self, fd, queue):
assert isinstance(queue, Queue.Queue)
assert callable(fd.readline)
threading.Thread.__init__(self)
self._fd = fd
self._queue = queue
def run(self):
'''The body of the tread: read lines and put them on the queue.'''
for line in iter(self._fd.readline, ''):
self._queue.put(line)
def eof(self):
'''Check whether there is no more content to expect.'''
return not self.is_alive() and self._queue.empty()
def consume(command):
'''
Example of how to consume standard output and standard error of
a subprocess asynchronously without risk on deadlocking.
'''
# Launch the command as subprocess.
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Launch the asynchronous readers of the process' stdout and stderr.
stdout_queue = Queue.Queue()
stdout_reader = AsynchronousFileReader(process.stdout, stdout_queue)
stdout_reader.start()
stderr_queue = Queue.Queue()
stderr_reader = AsynchronousFileReader(process.stderr, stderr_queue)
stderr_reader.start()
# Check the queues if we received some output (until there is nothing more to get).
while not stdout_reader.eof() or not stderr_reader.eof():
# Show what we received from standard output.
while not stdout_queue.empty():
line = stdout_queue.get()
print 'Received line on standard output: ' + repr(line)
# Show what we received from standard error.
while not stderr_queue.empty():
line = stderr_queue.get()
print 'Received line on standard error: ' + repr(line)
# Sleep a bit before asking the readers again.
time.sleep(.1)
# Let's be tidy and join the threads we've started.
stdout_reader.join()
stderr_reader.join()
# Close subprocess' file descriptors.
process.stdout.close()
process.stderr.close()
def produce(items=10):
'''
Dummy function to randomly render a couple of lines
on standard output and standard error.
'''
for i in range(items):
output = random.choice([sys.stdout, sys.stderr])
output.write('Line %d on %s\n' % (i, output))
output.flush()
time.sleep(random.uniform(.1, 1))
if __name__ == '__main__':
# The main flow:
# if there is an command line argument 'produce', act as a producer
# otherwise be a consumer (which launches a producer as subprocess).
if len(sys.argv) == 2 and sys.argv[1] == 'produce':
produce(10)
else:
consume(['python', sys.argv[0], 'produce'])