私は最近、タスクを高速化するためにマルチプロセッシングの実験を始めました。ファジー文字列マッチングを行い、さまざまなアルゴリズムを使用してスコアを計算するスクリプトを作成しました (さまざまなマッチング手法を比較したかったため)。完全なソースはhttps://bitbucket.org/bergonzzi/fuzzy-compare/srcにあります。入力として、ペアに結合された 2 つのファイルが必要です (file1 の各行と file2 の各行)。ペアごとに、あいまい一致スコアが計算されます。
3バージョン作りました。私のレポで提供されているサンプル データ (ペアに結合された後の 697.340 アイテムで構成されています) で実行すると、次のタイミングが得られます。
- シンプルな単一プロセス - 0:00:47
- Pool.map() を使用したマルチプロセス - 0:00:13
- キューを使用したマルチプロセス (プロデューサー/コンシューマー パターン) - 0:01:04
Pool.map() バージョンが Queue バージョンよりもはるかに高速である理由を理解しようとしていますが、これは実際には単純な単一プロセスのものよりも遅いです。
Queues を使用しようとする理由は、Pool.map() バージョンがすべてが終了するまで結果を保持し、最後にファイルに書き込むだけだからです。これは、大きなファイルの場合、最終的に大量のメモリを消費することを意味します。私はこのバージョンについて話しています(ここに貼り付けるコードが多いため、リンクしています)。
これを解決するために、私はそれを生産者/消費者パターンにリファクタリングしました(または少なくとも試みました)。ここでは、最初に両方の入力ファイルを組み合わせてジョブを生成し、コンシューマーが処理するキューに入れます (あいまい一致スコアを計算します)。完了したジョブはアウト キューに入れられます。次に、このキューから完了したアイテムを取得してファイルに書き込む単一のプロセスがあります。このようにすると、理論的には、結果がディスクにフラッシュされるため、それほど多くのメモリは必要ありません。正常に動作しているように見えますが、はるかに遅いです。また、Mac OSX のアクティビティ モニターを見ると、生成している 4 つのプロセスが 100% の CPU を使用していないように見えることにも気付きました (Pool.map() バージョンではそうではありません)。
私が気付いたもう 1 つの点は、私のプロデューサー関数はキューを適切に満たすように見えますが、コンシューマー プロセスは、最初のアイテムが到着するとすぐに動作を開始するのではなく、キューがいっぱいになるまで待機しているように見えることです。私はおそらくそこで何か間違ったことをしている...
参考までに、Queue バージョンに関連するコードの一部を次に示します (ただし、上記のリンク先のレポで完全なコードを確認することをお勧めします)。
ここに私のプロデューサー関数があります:
def combine(list1, list2):
'''
Combine every item of list1 with every item of list 2,
normalize put the pair in the job queue.
'''
pname = multiprocessing.current_process().name
for x in list1:
for y in list2:
# slugify is a function to normalize the strings
term1 = slugify(x.strip(), separator=' ')
term2 = slugify(y.strip(), separator=' ')
job_queue.put_nowait([term1, term2])
これはライター関数です。
def writer(writer_queue):
out = open(file_out, 'wb')
pname = multiprocessing.current_process().name
out.write(header)
for match in iter(writer_queue.get, "STOP"):
print("%s is writing %s") % (pname, str(match))
line = str(';'.join(match) + '\n')
out.write(line)
out.close()
これは、実際の計算を行うワーカー関数です (ここでは違いがないため、ほとんどのコードを削除しました。完全なソースはリポジトリにあります)。
def score_it(job_queue, writer_queue):
'''Calculate scores for pair of words.'''
pname = multiprocessing.current_process().name
for pair in iter(job_queue.get_nowait, "STOP"):
# do all the calculations and put the result into the writer queue
writer_queue.put(result)
これは私がプロセスをセットアップする方法です:
# Files
to_match = open(args.file_to_match).readlines()
source_list = open(args.file_to_be_matched).readlines()
workers = 4
job_queue = multiprocessing.Manager().Queue()
writer_queue = multiprocessing.Manager().Queue()
processes = []
print('Start matching with "%s", minimum score of %s and %s workers') % (
args.algorithm, minscore, workers)
# Fill up job queue
print("Filling up job queue with term pairs...")
c = multiprocessing.Process(target=combine, name="Feeder", args=(to_match, source_list))
c.start()
c.join()
print("Job queue size: %s") % job_queue.qsize()
# Start writer process
w = multiprocessing.Process(target=writer, name="Writer", args=(writer_queue,))
w.start()
for w in xrange(workers):
p = multiprocessing.Process(target=score_it, args=(job_queue, writer_queue))
p.start()
processes.append(p)
job_queue.put("STOP")
for p in processes:
p.join()
writer_queue.put("STOP")
マルチプロセッシングが時々遅くなることについてここでかなり読んだことがありますが、これは新しいプロセスの作成と管理のオーバーヘッドに関係していることを知っています。また、実行するジョブが十分に「大きく」ない場合、マルチプロセッシングの効果が見えない場合があります。ただし、この場合、ジョブはかなり大きいと思います。また、Pool.map() バージョンは、はるかに高速であるため、それを証明しているようです。
これらすべてのプロセスを管理し、キュー オブジェクトを渡すときに、何か間違ったことをしているでしょうか? 実行中に必要なメモリの量を最小限に抑えるために、処理中に結果をファイルに書き込むことができるように、これを最適化するにはどうすればよいでしょうか?
ありがとう!