2

Windowsにインタラクティブなコンソールプログラムがあります。'e''c'のようなキーストロークを押してcmdウィンドウに移動する必要があります。人間の操作には便利ですが、プログラムの自動化には非常に困難です。

そして今、Pythonのような他のプログラムで操作しやすくするために、コンソールプログラムをラップしたいと思います。

ただし、コンソールプログラムは'getch()'を使用して、標準入力ではないキーボード入力を取得します。したがって、単純にキーをstdinに送信することはできません。

誰かがこの問題に遭遇したことがありますか?ありがとう。

    class ThreadWorker(threading.Thread):
        def __init__(self, callable, *args, **kwargs):
            super(ThreadWorker, self).__init__()
            self.callable = callable
            self.args = args
            self.kwargs = kwargs
            self.setDaemon(True)

        def run(self):
            try:
                self.callable(*self.args, **self.kwargs)
            except Exception, e:
                print e

        def start(self):
            global running
            running = True
            threading.Thread.start(self)

    def console_presskey(char):
        proc.stdin.write(char)
        proc.stdin.flush()

    def start():
        def worker(pipe):
            while running:
                line = pipe.readline()
                if line == '': 
                    break
                else:
                    print line,

        proc = Popen("something.exe",stdout=PIPE,stdin=PIPE)

        stdout_worker = ThreadWorker(worker, proc.stdout)
        stderr_worker = ThreadWorker(worker, proc.stderr)
        stdout_worker.start()
        stderr_worker.start()


    if __name__ == "__main__":
        start()
        sleep(2)
        console_presskey('e')
        sleep(2)
        console_presskey('c')

編集:

最後に、win32SendMessage関数を使用して作業を行います。新しいサブプロセスをフォークしてから非表示にし、hWndとpidを取得します。

コードは次のとおりです。

import threading
import re
import os
from subprocess import Popen, PIPE,STDOUT,CREATE_NEW_CONSOLE,STARTUPINFO,STARTF_USESHOWWINDOW,SW_HIDE
from time import sleep
import win32process,win32con,win32gui,win32api

TargetPower = 'N/A'
Mode = 'N/A'
Channel = 'N/A'
con_stdin = ''
con_stdout = ''
stdout_worker = ''
stdin_woker = ''
running = False
proc = ''
Console_hwnd = ''
#status = 'NotStarted' # or Running 
class ThreadWorker(threading.Thread):
    def __init__(self, callable, *args, **kwargs):
        super(ThreadWorker, self).__init__()
        self.callable = callable
        self.args = args
        self.kwargs = kwargs
        self.setDaemon(True)

    def run(self):
        try:
            self.callable(*self.args, **self.kwargs)
        except Exception, e:
            print e

    def start(self):
        global running
        running = True
        threading.Thread.start(self)
def get_hwnds_for_pid (pid):
    def callback (hwnd, hwnds):
        #if win32gui.IsWindowVisible(hwnd) and win32gui.IsWindowEnabled(hwnd):
        _, found_pid = win32process.GetWindowThreadProcessId(hwnd)
        #print hwnd
        if found_pid == pid:
            hwnds.append(hwnd)
        return True
    hwnds = []
    win32gui.EnumWindows(callback, hwnds)
    #if hwnds == []:
        #raise 
    return hwnds

def sendkey(char):
    global Console_hwnd
    hwnd = Console_hwnd[0]
    code = ord(char)
    win32api.SendMessage(hwnd, win32con.WM_CHAR, code, 0)
    print '[*]',char,'Pressed.'

    #Another Keypress example. only works with keycode
    #win32api.PostMessage(hwnd, win32con.WM_KEYDOWN, win32con.VK_F9, 0)
    #print char,code,"down"
    #win32api.PostMessage(hwnd, win32con.WM_KEYUP, code, 0)
    #print char,code,"up"
def sendesc():
    global Console_hwnd
    hwnd = Console_hwnd[0]
    win32api.PostMessage(hwnd, win32con.WM_KEYDOWN, win32con.VK_ESCAPE, 0)
    print '[*]',"Esc down"
    win32api.PostMessage(hwnd, win32con.WM_KEYUP, win32con.VK_ESCAPE, 0)
    print '[*]',"Esc up"

def start():
    def worker(pipe):
        global TargetPower
        global Mode
        global Channel
        global running
        while running:
            line = pipe.readline()
            if line == '':
                break
            elif line.startswith('|') or line.startswith('==='):
                pass
            elif line.startswith("Operating in"):
                info = line
                for i in range(7):
                    info += pipe.readline()
                    #print 'ART> '+info,
                try:
                    TargetPower = eval(re.search(r'(Output|Target) (power|Power) = .{4}',info).group(0).split('=')[1])
                    #27.0
                    Mode = re.search(r'gOffRate = (.+?),',info).group(1).lstrip().rstrip()
                    #6MBps
                    Channel = re.search(r'channel ([\d.]+GHz),',info).group(1)
                    #2.412GHz
                except Exception,e:
                    TargetPower = 'N/A'
                    Mode = 'N/A'
                    Channel = 'N/A'
                    print e
            elif line =='\r\n':
                print '>',
            else:
                print 'ART>'+line,
        print 'worker done.'
    global proc
    si = STARTUPINFO()
    si.dwFlags |= STARTF_USESHOWWINDOW
    si.wShowWindow = SW_HIDE
    #proc = Popen("art.bat",stdout=PIPE,creationflags=CREATE_NEW_CONSOLE) #qt works!
    proc = Popen("art.bat",stdout=PIPE,creationflags=CREATE_NEW_CONSOLE,startupinfo=si)
    #proc = Popen("art.bat",stdout=PIPE,startupinfo=si)  #hidden
    #proc = Popen("cmd")  #for test
    sleep(2)
    print '[*] pid: ',proc.pid

    global Console_hwnd
    Console_hwnd = get_hwnds_for_pid(proc.pid)
    print '[*] hwnd:',Console_hwnd[0]

    global stdout_worker
    global stderr_worker
    stdout_worker = ThreadWorker(worker, proc.stdout)
    stderr_worker = ThreadWorker(worker, proc.stderr)
    stdout_worker.start()
    stderr_worker.start()


def stop():
    global stdout_worker
    global stderr_worker
    global running
    print 'stop'
    global proc
    sendesc()
    sendesc()
    sendesc()
    Popen("taskkill /F /T /PID %i"%proc.pid , shell=True)
    try:
        running = False
        TargetPower = 'N/A'
        Mode = 'N/A'
        Channel = 'N/A'
    except Exception,e:
        print e

if __name__ == "__main__":
    start()
    sleep(1)
    sendkey('e')
    sleep(1)
    sendkey('c')
    sleep(10)
    stop()
    while True:
        pass
4

2 に答える 2

0

私の知る限り、標準入力を使用getch() します。アプリケーションはその入力を受け取りましたが、その出力を受け取っていないのでしょうか? 出力が端末に向けられていない場合、出力がバッファリングされることが多いため、これは一般的な問題です。各行の後に出力をフラッシュするようにプログラムを調整できますか?

あなたの Python コードには多くの問題があります。stderr のパイプを作成しておらず、proc 変数が start メソッドに対してローカルであるように見えます。

当分の間、サブプロセスの出力を Python プロセスから継承したままにしておくことをお勧めします。これにより、バッファリングが抑制され、エラーも除外さThreadWorkerれます。

于 2012-08-03T11:41:56.687 に答える
0

あなたがする必要があるのは次のとおりです。

新しい行に各ステップを含むファイルを作成します。

例えば:

ステップ 1
ステップ 2
ステップ 3

次に、次のようにプログラムを呼び出します。

program.exe < 入力.txt

言っ途切れる!!

于 2013-11-14T15:50:19.270 に答える