2

ラウンドロビンで200個のファイルに対してプログラムを実行する必要があります。

今、私はそれらを次のように実行しています:

for combo in it.combinations(files, 2):
    cmd = ["command", combo[0], combo[1]]
    subprocess.Popen(cmd)

コンピューターを圧倒しないように、一度に60とだけ実行したいのですが、コマンドはかなりプロセッサーを集中的に使用します。60個のプロセスが実行されたらループを一時停止し、1つが終了したらループを再開して、常に60個のプロセスが実行されるようにするための最良の方法は何ですか?

4

4 に答える 4

4
#!/usr/bin/env python
import itertools
import subprocess
from multiprocessing.dummy import Pool # use threads

def run(combo):
    cmd = ["command", combo[0], combo[1]]
    return combo, subprocess.call(cmd)

def main():
    p = Pool(60) # 60 subprocesses at a time
    for combo, rc in p.imap_unordered(run, itertools.combinations(files, 2)):
        print("%s exited with %s" % (combo, rc))
    p.close()
    p.join()

if __name__ == "__main__":
    main()

この回答は、同時サブプロセスの数を制限するためのさまざまな手法を示しています。これは、multiprocessing.Pool、concurrent.futures、スレッド化+キューベースのソリューションを示しています。

于 2013-03-11T15:49:43.707 に答える
1

これは役立つかもしれません:

import itertools as it
import time
import subprocess

files = range(5)
max_load = 3
sleep_interval = 0.5

pid_list = []
for combo in it.combinations(files, 2):
  # Random command that takes time
  cmd = ['sleep', str(combo[0]+combo[1])]

  # Launch and record this command
  print "Launching: ", cmd
  pid = subprocess.Popen(cmd)
  pid_list.append(pid)

  # Deal with condtion of exceeding maximum load
  while len(filter(lambda x: x.poll() is None, pid_list)) >= max_load:
    time.sleep(sleep_interval)
于 2013-03-11T15:25:24.987 に答える
0

あなたは本当に簡単なことをすることができます:

from time import sleep

count = 0
for combo in it.combinations(files, 2):
    while count < 60:
        cmd = ["command", combo[0], combo[1]]
        subprocess.Popen(cmd)
        count = count + 1
        if subprocess_is_done:
            count = count - 1
    sleep(5)

明らかに、コマンドから取得する方法を理解する必要がありsubprocess_is_doneます。

これは、私が知る限り、些細なケースでも機能しますが、何を実行しようとしているのかわかりません...

于 2013-03-11T15:17:58.057 に答える
0

あなたはこのようなものが欲しいです:

import socket
import threading
import Queue
import subprocess

class IPThread(threading.Thread):
    def __init__(self, queue, num):
        super(IPThread, self).__init__()
        self.queue = queue
        self.num = num
    def run(self):
        while True:
            try:
                args = self.queue.get_nowait()
                cmd = ["echo"] + [str(i) for i in args]
                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                out, err = p.communicate()
                print out
            except Queue.Empty:
                # Nothing left in the Queue -- we are done
                print "Queue %d done" % self.num
                break
            except Exception as err:
                # Handle exception
                print err
            self.queue.task_done()

def create_threads(q, size):
    for i in range(size):
        thread = IPThread(q, i)
        thread.setDaemon(True)
        thread.start()
    q.join()

def fill_queue(q):
    # Call q.put(args) in a loop to populate Queue with arguments
    from itertools import permutations
    x = list(range(20))
    for arg1, arg2 in permutations(x, 2):
        q.put([arg1, arg2])
    print q.qsize()

def main():
    q = Queue.Queue()
    fill_queue(q)
    create_threads(q, 60)
    print "Done"

if __name__ == '__main__':
    main()

作業するもののキューを作成します。Threadから派生したクラスを特殊化します。スレッドをスピンアップします。それらが完了するのを待ちます。

タスクの出力が互いに干渉しているため、タスクが同時に実行されていることがわかります。それは機能です!

于 2013-03-11T15:16:09.847 に答える