3

私はそのようなものであるPythonスクリプトを持っています:

def test_run():
     global files_dir
     for f1 in os.listdir(files_dir):
          for f2 os.listdir(files_dir):
               os.system("run program x on f1 and f2")

os.system異なるプロセッサで各呼び出しを呼び出す最良の方法は何ですか? サブプロセスまたはマルチプロセッシングプールを使用していますか?

注 : プログラムを実行するたびに、出力ファイルが生成されます。

4

2 に答える 2

9

@unutbuの答えは問題ありませんが、それを行うためのより混乱の少ない方法があります.aを使用しPoolてタスクを渡します。そうすれば、独自のキューをいじる必要はありません。例えば、

import os
NUM_CPUS = None  # defaults to all available

def worker(f1, f2):
    os.system("run program x on f1 and f2")

def test_run(pool):
     filelist = os.listdir(files_dir)
     for f1 in filelist:
          for f2 in filelist:
               pool.apply_async(worker, args=(f1, f2))

if __name__ == "__main__":
     import multiprocessing as mp
     pool = mp.Pool(NUM_CPUS)
     test_run(pool)
     pool.close()
     pool.join()

それは、あなたが始めたコードに「よく似ています」。これは必ずしも良いことではありません;-)

Python 3 の最近のバージョンでは、Poolオブジェクトをコンテキスト マネージャーとしても使用できるため、末尾を次のように減らすことができます。

if __name__ == "__main__":
     import multiprocessing as mp
     with mp.Pool(NUM_CPUS) as pool:
         test_run(pool)

EDIT:代わりにconcurrent.futuresを使用

このような非常に単純なタスクの場合、Python 3 のconcurrent.futures方が使いやすい場合があります。test_run()上記のコードを次のように上から下に置き換えます。

def test_run():
     import concurrent.futures as cf
     filelist = os.listdir(files_dir)
     with cf.ProcessPoolExecutor(NUM_CPUS) as pp:
         for f1 in filelist:
             for f2 in filelist:
                 pp.submit(worker, f1, f2)

if __name__ == "__main__":
     test_run()

ワーカー プロセスの例外を静かに消滅させたくない場合は、より洗練されたものにする必要があります。これは、すべての並列処理ギミックで潜在的な問題です。問題は、通常、メイン プログラムで例外を発生させる良い方法がないことです。例外は、その時点でメイン プログラムが行っていることとは関係のないコンテキスト (ワーカー プロセス) で発生するためです。メイン プログラムで例外を (再) 発生させる 1 つの方法は、結果を明示的に要求することです。たとえば、上記を次のように変更します。

def test_run():
     import concurrent.futures as cf
     filelist = os.listdir(files_dir)
     futures = []
     with cf.ProcessPoolExecutor(NUM_CPUS) as pp:
         for f1 in filelist:
             for f2 in filelist:
                 futures.append(pp.submit(worker, f1, f2))
     for future in cf.as_completed(futures):
         future.result()

次に、ワーカー プロセスで例外が発生した場合、失敗したプロセス間呼び出しを表すオブジェクトfuture.result()に例外が適用されると、メイン プログラムでその例外が再発生します。Future

おそらく、この時点であなたが知りたいと思っていた以上のものです;-)

于 2013-12-29T01:44:10.033 に答える