5

Python で同時スレッド数を制限するにはどうすればよいですか?

たとえば、多くのファイルを含むディレクトリがあり、それらすべてを処理したいのですが、一度に 4 つだけを並列処理したいとします。

これが私がこれまでに持っているものです:

def process_file(fname):
        # open file and do something                                                                                            

def process_file_thread(queue, fname):
    queue.put(process_file(fname))

def process_all_files(d):
    files=glob.glob(d + '/*')
    q=Queue.Queue()
    for fname in files:
        t=threading.Thread(target=process_file_thread, args=(q, fname))
        t.start()
    q.join()

def main():
    process_all_files('.')
    # Do something after all files have been processed

一度に 4 つのスレッドのみが実行されるようにコードを変更するにはどうすればよいですか?

すべてのファイルが処理されるのを待ってから、処理されたファイルの作業を続行することに注意してください。

4

2 に答える 2

9

たとえば、多くのファイルを含むディレクトリがあり、それらすべてを処理したいのですが、一度に 4 つだけを並列処理したいとします。

それがまさにスレッド プールの機能です。ジョブを作成すると、プールは一度に 4 つのジョブを並行して実行します。エグゼキューターを使用すると、関数 (または他の呼び出し可能オブジェクト) を渡すだけで、結果の先物を返すだけで、物事をさらに簡単にすることができます。これらはすべて自分で構築できますが、そうする必要はありません。*

stdlib のconcurrent.futuresモジュールは、これを行う最も簡単な方法です。(Python 3.1 以前については、バックポートを参照してください。) 実際、主な例の 1 つは、やりたいことに非常に近いものです。しかし、それをあなたの正確なユースケースに適応させましょう:

def process_all_files(d):
    files = glob.glob(d + '/*')
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        fs = [executor.submit(process_file, file) for file in files]
        concurrent.futures.wait(fs)

process_file何かを返したい場合は、ほぼ同じくらい簡単です。

def process_all_files(d):
    files = glob.glob(d + '/*')
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        fs = [executor.submit(process_file, file) for file in files]
        for f in concurrent.futures.as_completed(fs):
            do_something(f.result())

また、例外も処理したい場合は、例を見てください。への呼び出しをtry/で囲んだだけです。exceptresult()


* 自分で作成する場合は、それほど難しくありません。へのソースmultiprocessing.poolはよく書かれており、コメントが付けられており、それほど複雑ではありません。難しいことのほとんどは、スレッド化には関係ありません。へのソースconcurrent.futuresはさらに単純です。

于 2013-08-21T01:03:46.360 に答える
0

私はこの手法を数回使用しましたが、少し醜い考えだと思います:

import threading

def process_something():
    something = list(get_something)

    def worker():
        while something:
            obj = something.pop()
            # do something with obj

   threads = [Thread(target=worker) for i in range(4)]
   [t.start() for t in threads]
   [t.join() for t in threads]
于 2016-10-19T17:59:04.980 に答える