1

私は、Pythonでサブプロセスを使用してパイプを連鎖させ、行ごとに(事前に使用せずに)パイプを読み書きするために、次のコードを使用しているように見えますcommunicate()。このコードは、Unix コマンドを呼び出し ( mycmd)、その出力を読み取り、それを別の Unix コマンドの stdin に書き込み ( next_cmd)、最後のコマンドの出力をファイルにリダイレクトするだけです。

    # some unix command that uses a pipe: command "a"
    # writes to stdout and "b" reads it and writes to stdout
    mycmd = "a | b" 
    mycmd_proc = subprocess.Popen(mycmd, shell=True,
                                  stdin=sys.stdin,
                                  stdout=subprocess.PIPE,
                                  stderr=subprocess.PIPE)
    # nextCmd reads from stdin, and I'm passing it mycmd's output
    next_cmd = "nextCmd -stdin"
    output_file = open(output_filename, "w")
    next_proc = subprocess.Popen(next_cmd, shell=True,
                                  stdin=subprocess.PIPE,
                                  stdout=output_file)
    for line in iter(mycmd.stdout.readline, ''):
        # do something with line
        # ...
        # write it to next command
        next_proc.stdin.write(line)
    ### If I wanted to call another command here that passes next_proc output
    ### line by line to another command, would I need
    ### to call next_proc.communicate() first?
    next_proc.communicate()
    output_file.close()

これは機能しているように見えcommunicate()、コマンドの最後でのみ呼び出されます。

このコードを拡張して別のコマンドを追加しようとしているので、次のことができます。

mycmd1 | mycmd2 | mycmd3 > some_file

意味: 行ごとに、Python からmycmd1 の出力を読み取り、行を処理し、それを mycmd2 にフィードし、mycmd2 の出力を読み取り、それを 1 行ずつ処理して mycmd3 にフィードしますsome_file。これは可能ですか、またはデッドロック/ブロッキング/フラッシュされていないバッファで終了することはありますか? 次のコマンドにフィードする前に、間に Python を介入させ、各コマンドの出力を 1 行ずつ後処理したいので、3 つの UNIX コマンドをパイプとして呼び出すだけではないことに注意してください。

通信を呼び出してすべての出力をメモリにロードすることを避けたい - 代わりに行ごとに解析したい。ありがとう。

4

1 に答える 1

1

これは、任意の数のコマンドを処理する必要があります。

import sys
import subprocess

def processFirst(out):
    return out

def processSecond(out):
    return out

def processThird(out):
    return out

commands = [("a|b", processFirst), ("nextCmd -stdin", processSecond), ("thirdCmd", processThird)]

previous_output = None
for cmd,process_func in commands:
    if previous_output is None:
        stdin = sys.stdin
    else:
        stdin = subprocess.PIPE
    proc = subprocess.Popen(cmd, shell=True,
                            stdin = stdin,
                            stdout = subprocess.PIPE)
    if previous_output is not None:
        proc.stdin.write(previous_output)

    out,err = proc.communicate()
    out = process_func(out)
    previous_output = out

実行したいコマンドを、その出力を処理する関数とともにコマンドのリストに追加するだけです。最後のコマンドからの出力previous_outputは、ループの最後にあることになります。

デッドロック/バッファリング/その他の問題を回避するには、各コマンドを実行してproc.communicate()、出力を返すだけです(例のように直接読み取るのではなく)。次に、それを次のコマンドに入力してから、完了まで実行させます。

communicate()編集:前もって使用したくないこと、および行ごとに反応したいことに気付きました。それに対処するために、私の答えを少し編集します

この回答は、を使用してブロックせずにパイプから行ごとに読み取る方法の例を提供しますselect.select()

以下は、特定のケースでそれを使用する例です。

import sys
import subprocess
import select
import os

class LineReader(object):
    def __init__(self, fd, process_func):
        self._fd = fd
        self._buf = ''
        self._process_func = process_func
        self.next_proc = None

    def fileno(self):
        return self._fd

    def readlines(self):
        data = os.read(self._fd, 4096)
        if not data:
            # EOF
            if self.next_proc is not None:
                self.next_proc.stdin.close()
            return None
        self._buf += data
        if '\n' not in data:
            return []
        tmp = self._buf.split('\n')
        tmp_lines, self._buf = tmp[:-1], tmp[-1]
        lines = []
        for line in tmp_lines:
            lines.append(self._process_func(line))
            if self.next_proc is not None:
                self.next_proc.stdin.write("%s\n" % lines[-1])

        return lines

def processFirst(line):
    return line

def processSecond(line):
    return line

def processThird(line):
    return line

commands = [("a|b", processFirst), ("nextCmd -stdin", processSecond), ("thirdCmd", processThird)]

readers = []
previous_reader = None
for cmd,process_func in commands:
    if previous_reader is None:
        stdin = sys.stdin
    else:
        stdin = subprocess.PIPE
    proc = subprocess.Popen(cmd, shell=True,
                            stdin = stdin,
                            stdout = subprocess.PIPE)

    if previous_reader is not None:
        previous_reader.next_proc = proc

    previous_reader = LineReader(proc.stdout.fileno(), process_func)
    readers.append(previous_reader)


while readers:
    ready,_,_  = select.select(readers, [], [], 10.0)
    for stream in ready:
        lines = stream.readlines()
        if lines is None:
            readers.remove(stream)
于 2013-03-04T00:16:28.093 に答える