@unutbuの答えは問題ありませんが、それを行うためのより混乱の少ない方法があります.aを使用しPool
てタスクを渡します。そうすれば、独自のキューをいじる必要はありません。例えば、
import os
NUM_CPUS = None # defaults to all available
def worker(f1, f2):
os.system("run program x on f1 and f2")
def test_run(pool):
filelist = os.listdir(files_dir)
for f1 in filelist:
for f2 in filelist:
pool.apply_async(worker, args=(f1, f2))
if __name__ == "__main__":
import multiprocessing as mp
pool = mp.Pool(NUM_CPUS)
test_run(pool)
pool.close()
pool.join()
それは、あなたが始めたコードに「よく似ています」。これは必ずしも良いことではありません;-)
Python 3 の最近のバージョンでは、Pool
オブジェクトをコンテキスト マネージャーとしても使用できるため、末尾を次のように減らすことができます。
if __name__ == "__main__":
import multiprocessing as mp
with mp.Pool(NUM_CPUS) as pool:
test_run(pool)
EDIT:代わりにconcurrent.futuresを使用
このような非常に単純なタスクの場合、Python 3 のconcurrent.futures
方が使いやすい場合があります。test_run()
上記のコードを次のように上から下に置き換えます。
def test_run():
import concurrent.futures as cf
filelist = os.listdir(files_dir)
with cf.ProcessPoolExecutor(NUM_CPUS) as pp:
for f1 in filelist:
for f2 in filelist:
pp.submit(worker, f1, f2)
if __name__ == "__main__":
test_run()
ワーカー プロセスの例外を静かに消滅させたくない場合は、より洗練されたものにする必要があります。これは、すべての並列処理ギミックで潜在的な問題です。問題は、通常、メイン プログラムで例外を発生させる良い方法がないことです。例外は、その時点でメイン プログラムが行っていることとは関係のないコンテキスト (ワーカー プロセス) で発生するためです。メイン プログラムで例外を (再) 発生させる 1 つの方法は、結果を明示的に要求することです。たとえば、上記を次のように変更します。
def test_run():
import concurrent.futures as cf
filelist = os.listdir(files_dir)
futures = []
with cf.ProcessPoolExecutor(NUM_CPUS) as pp:
for f1 in filelist:
for f2 in filelist:
futures.append(pp.submit(worker, f1, f2))
for future in cf.as_completed(futures):
future.result()
次に、ワーカー プロセスで例外が発生した場合、失敗したプロセス間呼び出しを表すオブジェクトfuture.result()
に例外が適用されると、メイン プログラムでその例外が再発生します。Future
おそらく、この時点であなたが知りたいと思っていた以上のものです;-)