0

バッファリングされていない出力を次の形式で出力するスクリプト(worker.py)があります...

1
2
3
.
.
.
n

ここで、nは、このスクリプトのループが行う一定の反復回数です。別のスクリプト(service_controller.py)で、いくつかのスレッドを開始します。各スレッドは、subprocess.Popen(stdout = subprocess.PIPE、...);を使用してサブプロセスを開始します。ここで、メインスレッド(service_controller.py)で、各スレッドのworker.pyサブプロセスの出力を読み取り、それを使用して、完了までの残り時間の見積もりを計算します。

私はworker.pyからstdoutを読み取り、最後に出力された数値を決定するすべてのロジックを動作させています。問題は、これをブロックしない方法で行う方法がわからないことです。定数bufsizeを読み取ると、各読み取りは各ワーカーからの同じデータを待機することになります。fcntl、select + os.readなどを使用するなど、さまざまな方法を試しました。ここでの最善のオプションは何ですか。必要に応じてソースを投稿できますが、説明で問題が十分に説明されていると思いました。

ここで助けてくれてありがとう。

編集
サンプルコードの追加

サブプロセスを開始するワーカーがいます。

class WorkerThread(threading.Thread):
    def __init__(self):
        self.completed = 0
        self.process = None
        self.lock = threading.RLock()
        threading.Thread.__init__(self)

    def run(self):
        cmd = ["/path/to/script", "arg1", "arg2"]
        self.process = subprocess.Popen(cmd, stdout=subprocess.PIPE, bufsize=1, shell=False)
        #flags = fcntl.fcntl(self.process.stdout, fcntl.F_GETFL)
        #fcntl.fcntl(self.process.stdout.fileno(), fcntl.F_SETFL, flags | os.O_NONBLOCK)

    def get_completed(self):
        self.lock.acquire();
        fd = select.select([self.process.stdout.fileno()], [], [], 5)[0]
        if fd:
            self.data += os.read(fd, 1)
            try:
                self.completed = int(self.data.split("\n")[-2])
            except IndexError:
                pass
        self.lock.release()
        return self.completed

次に、ThreadManagerがあります。

class ThreadManager():
    def __init__(self):
        self.pool = []
        self.running = []
        self.lock = threading.Lock()

    def clean_pool(self, pool):
        for worker in [x for x in pool is not x.isAlive()]:
            worker.join()
            pool.remove(worker)
            del worker
        return pool

    def run(self, concurrent=5):
        while len(self.running) + len(self.pool) > 0:
            self.clean_pool(self.running)
            n = min(max(concurrent - len(self.running), 0), len(self.pool))
            if n > 0:
                for worker in self.pool[0:n]:
                    worker.start()
                self.running.extend(self.pool[0:n])
                del self.pool[0:n]
            time.sleep(.01)
         for worker in self.running + self.pool:
             worker.join()

そしてそれを実行するためのいくつかのコード。

threadManager = ThreadManager()
for i in xrange(0, 5):
    threadManager.pool.append(WorkerThread())
threadManager.run()

問題を特定するために、他のコードのログを削除しました。

4

2 に答える 2

2

service_controllerをi/oアクセスによってブロックする代わりに、スレッドループのみが独自の制御されたプロセス出力を読み取る必要があります。

次に、最後にポーリングされた出力を取得するプロセスを制御するスレッドオブジェクトにメソッドを含めることができます。

もちろん、その場合は、スレッドがバッファを埋めるために使用するバッファと、コントローラがバッファを取得するために呼び出すメソッドの両方で使用されるバッファを保護するために、何らかのロックメカニズムを使用することを忘れないでください。

于 2010-03-18T15:04:01.613 に答える
1

メソッド WorkerThread.run() はサブプロセスを起動し、すぐに終了します。Run() は、サブプロセスが完了するまで、ポーリングを実行して WorkerThread.completed を更新する必要があります。

于 2011-09-21T06:24:36.627 に答える