2

Python で書かれた高スループット分析の一環として、スタンドアロン プログラムで分析される複数のファイルを作成しようとしています。

for foo in X:
    write foo_file
    os.system(run_program foo_file)

15,000 個の個別のファイルの場合、複数のコアで実行できればより高速に実行できますが、サーバーを圧倒したくありません。複数のスレッドを OS で実行するように設定するにはどうすればよいですか。ランタイムは私の分野の外部プログラム標準によって定義されているため、プロセスの生成速度については心配していません。

スレッド化とマルチプロセッシングのドキュメントを見て、圧倒されました。

4

2 に答える 2

4

生成されるプロセスの総数を制限する簡単な方法は、マルチプロセッシング プールを使用することです。

マルチプロセッシング プールを示す簡単な例は次のとおりです。

test.py

from multiprocessing.pool import Pool
# @NOTE: The two imports below are for demo purposes and won't be necessary in
# your final program
import random
import time

def writeOut(index):
    """ A function which prints a start message, delays for a random interval and then
        prints a finish message
    """
    delay = random.randint(1,5)                                                                                                                                             
    print("Starting process #{0}".format(index))
    time.sleep(delay)
    print("Finished process #{0} which delayed for {1}s.".format(index, delay))

# Create a process pool with a maximum of 10 worker processes
pool = Pool(processes=10)
# Map our function to a data set - number 1 through 20
pool.map(writeOut, range(20))

次のような出力が得られるはずです。

[mike@tester ~]$ python test.py 
Starting process #0
Starting process #2
Starting process #3
Starting process #1
Starting process #4
Starting process #5
Starting process #6
Starting process #7
Starting process #8
Starting process #9
Finished process #2 which delayed for 1s.
Starting process #10
Finished process #7 which delayed for 1s.
Finished process #6 which delayed for 1s.
Starting process #11
Starting process #12
Finished process #9 which delayed for 2s.
Finished process #12 which delayed for 1s.
Starting process #13
Starting process #14
Finished process #1 which delayed for 3s.
Finished process #5 which delayed for 3s.
Starting process #15
Starting process #16
Finished process #8 which delayed for 3s.
Starting process #17
Finished process #4 which delayed for 4s.
Starting process #18
Finished process #10 which delayed for 3s.
Finished process #13 which delayed for 2s.
Starting process #19
Finished process #0 which delayed for 5s.
Finished process #3 which delayed for 5s.
Finished process #11 which delayed for 4s.
Finished process #15 which delayed for 2s.
Finished process #16 which delayed for 2s.
Finished process #18 which delayed for 2s.
Finished process #14 which delayed for 4s.
Finished process #17 which delayed for 5s.
Finished process #19 which delayed for 5s.

ご覧のとおり、最初の 10 個のプロセスが開始され、後続の各プロセスは、別のプロセス プール ワーカーが完了する (使用可能になる) とすぐに開始されます。(複数のスレッドではなく) 複数のプロセスを使用すると、グローバル インタープリター ロック (GIL)がバイパスされます。

このサンプル コードをタスクで動作させるには、ファイル出力関数を作成し、それと、書き込み先のファイル データの iterable を and の代わりに渡す必要がありpool.map()ます。writeOutrange(20)

于 2013-01-30T07:19:37.400 に答える
1

これを試して:

class ThreadWriteFile(threading.Thread):
    def __init__(self, queue_to_write, queue_to_run):
        threading.Thread.__init__(self)
        self.queue_to_write = queue_to_write
        self.queue_to_run = queue_to_run

    def run(self):
        while True:
            foo_file = self.queue_to_write.get()
            write foo_file
            self.queue_to_run.put(foo_file)
            self.queue_to_write.task_done()

class ThreadRunProgram(threading.Thread):
    def __init__(self, queue_to_run):
        threading.Thread.__init__(self)
        self.queue_to_run = queue_to_run

    def run(self):
        while True:
            foo_file = self.queue_to_run.get()
            os.system(run_program foo_file)
            self.queue_to_run.task_done()

queue_to_write = Queue.Queue()
queue_to_run = Queue.Queue()

for foo in X:
    twf = ThreadWriteFile(queue_to_write, queue_to_run)
    twf.daemon = True
    twf.start()
    queue_to_write.put(foo)

    trf = ThreadRunProgram(queue_to_run)
    trf.daemon = True
    trf.start()

queue_to_write.join()
queue_to_run.join()
于 2013-01-30T06:58:31.250 に答える