30,000 以上のファイルを順次解析する python プログラムがあります。
これを複数のスレッドに分割し (これは正しい用語ですか?)、そのファイルのチャンクを同時に解析する方法はありますか? それぞれ 1000 個のファイルを解析する 30 個のアルゴリズムがあるとします。
30,000 以上のファイルを順次解析する python プログラムがあります。
これを複数のスレッドに分割し (これは正しい用語ですか?)、そのファイルのチャンクを同時に解析する方法はありますか? それぞれ 1000 個のファイルを解析する 30 個のアルゴリズムがあるとします。
かんたんだよ。
30 個のスレッドを明示的に作成し、それぞれに 1000 個のファイル名を付けることができます。
しかし、もっと簡単に、30 個のスレッドのプールを作成して、それらに 30000 個のファイル名を持つスレッドをサービスさせることができます。これにより、自動的にロード バランシングが行われます。一部のファイルが他のファイルよりもはるかに大きい場合、別のスレッドが 10% しか完了していないときに 1 つのスレッドが終了することはありません。
このconcurrent.futures
モジュールは、タスクを並行して実行する優れた方法を提供します (タスクに引数を渡し、結果を受け取り、必要に応じて例外を受け取ることも含みます)。Python 2.x または 3.1 を使用している場合は、 backport をインストールする必要がありますfutures
。次に、これを行うだけです:
with concurrent.futures.ThreadPoolExecutor(max_workers=30) as executor:
results = executor.map(parse_file, filenames)
さて、30 人の従業員はおそらく多すぎます。ハードドライブとそのドライバーに圧倒され、ほとんどのスレッドがディスクのシークを待機することになります。しかし、少数ならやる価値があるかもしれません。max_workers
また、タイミングを微調整してテストし、システムのスイート スポットがどこにあるかを確認するのは非常に簡単です。
コードが I/O 作業よりも多くの CPU 作業を行っている場合 (つまり、ディスクからの読み取りよりも文字列の解析や複雑な構造の構築などに多くの時間を費やしている場合)、少なくとも CPython では、スレッドは役に立ちません。 Global Interpreter Lock のためです。しかし、プロセスを使用することでそれを解決できます。
コードの観点からは、これは簡単です: に変更ThreadPoolExecutor
するだけProcessPoolExecutor
です。
ただし、大規模または複雑なデータ構造を返す場合は、プロセスの境界を越えてデータ構造をシリアル化するのにかかる時間が節約を食いつぶすか、それを圧倒する可能性さえあります。その場合は、より大きなジョブをバッチ処理することで改善できる場合があります。
def parse_files(filenames):
return [parse_file(filename) for filename in filenames]
with concurrent.futures.ThreadPoolExecutor(max_workers=30) as executor:
results = executor.map(parse_files, grouper(10, filenames))
ただし、場合によっては、より低いレベルに落として、multiprocessing
プロセス間メモリ共有などの機能を持つモジュールを使用する必要がある場合があります。
を使用できない/使用したくない場合futures
、2.6+multiprocessing.Pool
にはプレーンなプロセッサ プールと、同じインターフェイスを持つスレッド プール (文書化されてmultiprocessing.ThreadPool
いません) またはmultiprocessing.dummy.Pool
(文書化されていますが醜い) があります。
このような些細なケースでは、プレーン プールとエグゼキューターの間に違いはありません。そして、前述のように、非常に複雑なケースmultiprocessing
では、ボンネットの下に入ることができます。途中で、futures
多くの場合、より単純です。しかし、両方を学ぶ価値があります。