4

なんらかの理由でMacにターミナルアクセスがないため、基本的なターミナルエミュレーションスクリプトを作成しようとしています。しかし、blenderでゲームエンジンスクリプトを書くには、通常blenderを起動したターミナルで開くコンソールが重要です。
削除や名前の変更などの単純なことを行うために。以前は、を使用してコマンドを実行していstream = os.popen(command)ましprint (stream.read())た。これはほとんどの場合は正常に機能しますが、インタラクティブなものには機能しません。
すぐに私は新しい方法を発見しました:
sp = subprocess.Popen(["/bin/bash", "-i"], stdout = subprocess.PIPE, stdin = subprocess.PIPE, stderr = subprocess.PIPE)そしてprint(sp.communicate(command.encode()))。それは私がターミナルのように使うことができるインタラクティブなシェルを生み出すはずですよね?

しかし、どちらの方法でも接続を開いたままにすることはできません。最後の例を使用すると、sp.communicate一度呼び出すことができ、次の出力(この場合は'ls /'の場合)といくつかのエラーが表示されます
(b'Applications\n[...]usr\nvar\n', b'bash: no job control in this shell\nbash-3.2$ ls /\nbash-3.2$ exit\n')。2回目はValueError: I/O operation on closed file. 時々('ls'のように)このエラーが発生するだけです:b'ls\nbash-3.2$ exit\n'

どういう意味ですか?インタラクティブシェルを制御したり、blenderを実行してコンソールと通信したりできるようにするPythonで端末をエミュレートするにはどうすればよいですか?

4

3 に答える 3

10

入力を要求し続けるインタラクティブシェルが必要な場合は、次のことを試すことができます。

import subprocess
import re

while True:
    # prevents lots of python error output
    try:
        s = raw_input('> ')
    except:
        break

    # check if you should exit
    if s.strip().lower() == 'exit':
        break

    # try to run command
    try:
        cmd = subprocess.Popen(re.split(r'\s+', s), stdout=subprocess.PIPE)
        cmd_out = cmd.stdout.read()

        # Process output
        print cmd_out

    except OSError:
        print 'Invalid command'
于 2012-07-24T16:13:05.943 に答える
5

これが私がウィンドウでやりたいことをするために取り組んだことです。ウィンドウは標準に準拠しておらず、独自の標準に準拠していないため、はるかに難しい問題です。このコードを少し変更すると、探しているものが正確に得られるはずです。

'''
Created on Mar 2, 2013

@author: rweber
'''
import subprocess
import Queue
from Queue import Empty
import threading


class Process_Communicator():

    def join(self):
        self.te.join()
        self.to.join()
        self.running = False
        self.aggregator.join()

    def enqueue_in(self):
        while self.running and self.p.stdin is not None:
            while not self.stdin_queue.empty():
                s = self.stdin_queue.get()
                self.p.stdin.write(str(s) + '\n\r')
            pass

    def enqueue_output(self):
        if not self.p.stdout or self.p.stdout.closed:
            return
        out = self.p.stdout
        for line in iter(out.readline, b''):
            self.qo.put(line)

    def enqueue_err(self):
        if not self.p.stderr or self.p.stderr.closed:
            return
        err = self.p.stderr
        for line in iter(err.readline, b''):
            self.qe.put(line)

    def aggregate(self):
        while (self.running):
            self.update()
        self.update()

    def update(self):
        line = ""
        try:
            while self.qe.not_empty:
                line = self.qe.get_nowait()  # or q.get(timeout=.1)
                self.unbblocked_err += line
        except Empty:
            pass

        line = ""
        try:
            while self.qo.not_empty:
                line = self.qo.get_nowait()  # or q.get(timeout=.1)
                self.unbblocked_out += line
        except Empty:
            pass

        while not self.stdin_queue.empty():
                s = self.stdin_queue.get()
                self.p.stdin.write(str(s) + '\n\r')

    def get_stdout(self, clear=True):
        ret = self.unbblocked_out
        if clear:
            self.unbblocked_out = ""
        return ret

    def has_stdout(self):
        ret = self.get_stdout(False)
        if ret == '':
            return None
        else:
            return ret

    def get_stderr(self, clear=True):
        ret = self.unbblocked_err
        if clear:
            self.unbblocked_err = ""
        return ret

    def has_stderr(self):
        ret = self.get_stderr(False)
        if ret == '':
            return None
        else:
            return ret

    def __init__(self, subp):
        '''This is a simple class that collects and aggregates the
        output from a subprocess so that you can more reliably use
        the class without having to block for subprocess.communicate.'''
        self.p = subp
        self.unbblocked_out = ""
        self.unbblocked_err = ""
        self.running = True
        self.qo = Queue.Queue()
        self.to = threading.Thread(name="out_read",
                                    target=self.enqueue_output,
                                    args=())
        self.to.daemon = True  # thread dies with the program
        self.to.start()

        self.qe = Queue.Queue()
        self.te = threading.Thread(name="err_read",
                                   target=self.enqueue_err,
                                   args=())
        self.te.daemon = True  # thread dies with the program
        self.te.start()

        self.stdin_queue = Queue.Queue()
        self.aggregator = threading.Thread(name="aggregate",
                                           target=self.aggregate,
                                           args=())
        self.aggregator.daemon = True  # thread dies with the program
        self.aggregator.start()
        pass
def write_stdin(p,c):
    while p.poll() == None:
        i = raw_input("send to process:")
        if i is not None:
            c.stdin_queue.put(i)


p = subprocess.Popen("cmd.exe", shell=True, stdout=subprocess.PIPE,
                     stderr=subprocess.PIPE, stdin=subprocess.PIPE)
c = Process_Communicator(p)
stdin = threading.Thread(name="write_stdin",
                           target=write_stdin,
                           args=(p,c))
stdin.daemon = True  # thread dies with the program
stdin.start()
while p.poll() == None:
    if c.has_stdout():
        print c.get_stdout()
    if c.has_stderr():
        print c.get_stderr()

c.join()
print "Exit"
于 2013-05-17T18:36:57.447 に答える
1

「forkpty」を割り当てた新しい制御端末で実行する必要があるようです。「ジョブ制御なし...」という警告を抑制するには、「setsid」を呼び出す必要があります。

于 2012-07-28T06:18:37.280 に答える