239

流体力学コードのドライバーとして python スクリプトを使用しています。シミュレーションを実行するときはsubprocess.Popen、コードを実行し、出力を --- から収集stdoutstderrsubprocess.PIPE--- 出力情報を出力 (およびログ ファイルに保存) し、エラーがないかどうかを確認します。問題は、コードの進行状況がわからないことです。コマンド ラインから直接実行すると、どの反復か、何時か、次の時間ステップは何かなどの出力が得られます。

出力を保存する方法 (ロギングとエラー チェック用) と、ライブ ストリーミング出力を生成する方法はありますか?

私のコードの関連セクション:

ret_val = subprocess.Popen( run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True )
output, errors = ret_val.communicate()
log_file.write(output)
print output
if( ret_val.returncode ):
    print "RUN failed\n\n%s\n\n" % (errors)
    success = False

if( errors ): log_file.write("\n\n%s\n\n" % errors)

もともと、コピーがログファイルに直接送られ、ストリームが引き続き端末に直接出力されるように、パイプrun_commandを介してパイプしていましたが、その方法ではエラーを保存できません(私の知る限り)。tee


これまでの私の一時的な解決策:

ret_val = subprocess.Popen( run_command, stdout=log_file, stderr=subprocess.PIPE, shell=True )
while not ret_val.poll():
    log_file.flush()

次に、別の端末でtail -f log.txt(st log_file = 'log.txt') を実行します。

4

18 に答える 18

211

Python 3 の TLDR:

import subprocess
import sys
with open('test.log', 'wb') as f: 
    process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
    for c in iter(lambda: process.stdout.read(1), b''): 
        sys.stdout.buffer.write(c)
        f.buffer.write(c)

readこれを行うには 2 つの方法があります。またはreadline 関数からイテレータを作成し、次のようにします。

import subprocess
import sys
with open('test.log', 'w') as f:  # replace 'w' with 'wb' for Python 3
    process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
    for c in iter(lambda: process.stdout.read(1), ''):  # replace '' with b'' for Python 3
        sys.stdout.write(c)
        f.write(c)

また

import subprocess
import sys
with open('test.log', 'w') as f:  # replace 'w' with 'wb' for Python 3
    process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
    for line in iter(process.stdout.readline, ''):  # replace '' with b'' for Python 3
        sys.stdout.write(line)
        f.write(line)

またはreaderwriterファイルを作成することもできます。を に渡し、writerからPopen読み取るreader

import io
import time
import subprocess
import sys

filename = 'test.log'
with io.open(filename, 'wb') as writer, io.open(filename, 'rb', 1) as reader:
    process = subprocess.Popen(command, stdout=writer)
    while process.poll() is None:
        sys.stdout.write(reader.read())
        time.sleep(0.5)
    # Read the remaining
    sys.stdout.write(reader.read())

test.logこのようにして、標準出力と同様にデータが書き込まれます。

ファイル アプローチの唯一の利点は、コードがブロックされないことです。そのため、その間は好きなことをして、好きなときreaderにノンブロッキングで読むことができます。を使用するPIPEと、readおよびreadline関数は、それぞれ 1 文字がパイプに書き込まれるか、行がパイプに書き込まれるまでブロックされます。

于 2013-08-24T19:23:08.693 に答える
2

ラインバッファ出力がうまくいくように見えます。その場合、次のようなものが適しているかもしれません。(注意: テストされていません。) これは、サブプロセスの標準出力のみをリアルタイムで提供します。stderr と stdout の両方をリアルタイムで取得したい場合は、より複雑な操作を行う必要がありますselect

proc = subprocess.Popen(run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
while proc.poll() is None:
    line = proc.stdout.readline()
    print line
    log_file.write(line + '\n')
# Might still be data on stdout at this point.  Grab any
# remainder.
for line in proc.stdout.read().split('\n'):
    print line
    log_file.write(line + '\n')
# Do whatever you want with proc.stderr here...
于 2013-08-24T19:21:35.450 に答える
2

このメソッドは少し誤解を招くと思います。subprocess.communicate実際には、で指定したstdoutstderrsubprocess.Popenを埋めます。

それでも、のstdoutおよびstderrsubprocess.PIPEパラメーターに提供できるから読み取ると、最終的に OS パイプ バッファーがいっぱいになり、アプリがデッドロックします (特に、 を使用する必要がある複数のプロセス/スレッドがある場合)。subprocess.Popensubprocess

私が提案する解決策は、stdoutstderrにファイルを提供し、デッドロックから読み取る代わりにファイルの内容を読み取ることですPIPE。これらのファイルはtempfile.NamedTemporaryFile()- によって書き込まれている間、読み取りのためにアクセスすることもできますsubprocess.communicate

以下は使用例です。

        try:
            with ProcessRunner(('python', 'task.py'), env=os.environ.copy(), seconds_to_wait=0.01) as process_runner:
                for out in process_runner:
                    print(out)
        catch ProcessError as e:
            print(e.error_message)
            raise

そして、これが何をするかを説明するために提供できる限り多くのコメントを付けて使用する準備ができているソース コードです。

Python 2 を使用している場合は、最初に最新バージョンのsubprocess32パッケージを pypi からインストールしてください。


import os
import sys
import threading
import time
import tempfile
import logging

if os.name == 'posix' and sys.version_info[0] < 3:
    # Support python 2
    import subprocess32 as subprocess
else:
    # Get latest and greatest from python 3
    import subprocess

logger = logging.getLogger(__name__)


class ProcessError(Exception):
    """Base exception for errors related to running the process"""


class ProcessTimeout(ProcessError):
    """Error that will be raised when the process execution will exceed a timeout"""


class ProcessRunner(object):
    def __init__(self, args, env=None, timeout=None, bufsize=-1, seconds_to_wait=0.25, **kwargs):
        """
        Constructor facade to subprocess.Popen that receives parameters which are more specifically required for the
        Process Runner. This is a class that should be used as a context manager - and that provides an iterator
        for reading captured output from subprocess.communicate in near realtime.

        Example usage:


        try:
            with ProcessRunner(('python', task_file_path), env=os.environ.copy(), seconds_to_wait=0.01) as process_runner:
                for out in process_runner:
                    print(out)
        catch ProcessError as e:
            print(e.error_message)
            raise

        :param args: same as subprocess.Popen
        :param env: same as subprocess.Popen
        :param timeout: same as subprocess.communicate
        :param bufsize: same as subprocess.Popen
        :param seconds_to_wait: time to wait between each readline from the temporary file
        :param kwargs: same as subprocess.Popen
        """
        self._seconds_to_wait = seconds_to_wait
        self._process_has_timed_out = False
        self._timeout = timeout
        self._process_done = False
        self._std_file_handle = tempfile.NamedTemporaryFile()
        self._process = subprocess.Popen(args, env=env, bufsize=bufsize,
                                         stdout=self._std_file_handle, stderr=self._std_file_handle, **kwargs)
        self._thread = threading.Thread(target=self._run_process)
        self._thread.daemon = True

    def __enter__(self):
        self._thread.start()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self._thread.join()
        self._std_file_handle.close()

    def __iter__(self):
        # read all output from stdout file that subprocess.communicate fills
        with open(self._std_file_handle.name, 'r') as stdout:
            # while process is alive, keep reading data
            while not self._process_done:
                out = stdout.readline()
                out_without_trailing_whitespaces = out.rstrip()
                if out_without_trailing_whitespaces:
                    # yield stdout data without trailing \n
                    yield out_without_trailing_whitespaces
                else:
                    # if there is nothing to read, then please wait a tiny little bit
                    time.sleep(self._seconds_to_wait)

            # this is a hack: terraform seems to write to buffer after process has finished
            out = stdout.read()
            if out:
                yield out

        if self._process_has_timed_out:
            raise ProcessTimeout('Process has timed out')

        if self._process.returncode != 0:
            raise ProcessError('Process has failed')

    def _run_process(self):
        try:
            # Start gathering information (stdout and stderr) from the opened process
            self._process.communicate(timeout=self._timeout)
            # Graceful termination of the opened process
            self._process.terminate()
        except subprocess.TimeoutExpired:
            self._process_has_timed_out = True
            # Force termination of the opened process
            self._process.kill()

        self._process_done = True

    @property
    def return_code(self):
        return self._process.returncode



于 2019-02-25T14:46:36.660 に答える