6

私は最近、タスクを高速化するためにマルチプロセッシングの実験を始めました。ファジー文字列マッチングを行い、さまざまなアルゴリズムを使用してスコアを計算するスクリプトを作成しました (さまざまなマッチング手法を比較したかったため)。完全なソースはhttps://bitbucket.org/bergonzzi/fuzzy-compare/srcにあります。入力として、ペアに結合された 2 つのファイルが必要です (file1 の各行と file2 の各行)。ペアごとに、あいまい一致スコアが計算されます。

3バージョン作りました。私のレポで提供されているサンプル データ (ペアに結合された後の 697.340 アイテムで構成されています) で実行すると、次のタイミングが得られます。

  • シンプルな単一プロセス - 0:00:47
  • Pool.map() を使用したマルチプロセス - 0:00:13
  • キューを使用したマルチプロセス (プロデューサー/コンシューマー パターン) - 0:01:04

Pool.map() バージョンが Queue バージョンよりもはるかに高速である理由を理解しようとしていますが、これは実際には単純な単一プロセスのものよりも遅いです。

Queues を使用しようとする理由は、Pool.map() バージョンがすべてが終了するまで結果を保持し、最後にファイルに書き込むだけだからです。これは、大きなファイルの場合、最終的に大量のメモリを消費することを意味します。私はこのバージョンについて話しています(ここに貼り付けるコードが多いため、リンクしています)。

これを解決するために、私はそれを生産者/消費者パターンにリファクタリングしました(または少なくとも試みました)。ここでは、最初に両方の入力ファイルを組み合わせてジョブを生成し、コンシューマーが処理するキューに入れます (あいまい一致スコアを計算します)。完了したジョブはアウト キューに入れられます。次に、このキューから完了したアイテムを取得してファイルに書き込む単一のプロセスがあります。このようにすると、理論的には、結果がディスクにフラッシュされるため、それほど多くのメモリは必要ありません。正常に動作しているように見えますが、はるかに遅いです。また、Mac OSX のアクティビティ モニターを見ると、生成している 4 つのプロセスが 100% の CPU を使用していないように見えることにも気付きました (Pool.map() バージョンではそうではありません)。

私が気付いたもう 1 つの点は、私のプロデューサー関数はキューを適切に満たすように見えますが、コンシューマー プロセスは、最初のアイテムが到着するとすぐに動作を開始するのではなく、キューがいっぱいになるまで待機しているように見えることです。私はおそらくそこで何か間違ったことをしている...

参考までに、Queue バージョンに関連するコードの一部を次に示します (ただし、上記のリンク先のレポで完全なコードを確認することをお勧めします)。

ここに私のプロデューサー関数があります:

def combine(list1, list2):
    '''
    Combine every item of list1 with every item of list 2,
    normalize put the pair in the job queue.
    '''
    pname = multiprocessing.current_process().name
    for x in list1:
        for y in list2:
            # slugify is a function to normalize the strings
            term1 = slugify(x.strip(), separator=' ')
            term2 = slugify(y.strip(), separator=' ')
            job_queue.put_nowait([term1, term2])

これはライター関数です。

def writer(writer_queue):
    out = open(file_out, 'wb')
    pname = multiprocessing.current_process().name
    out.write(header)
    for match in iter(writer_queue.get, "STOP"):
        print("%s is writing %s") % (pname, str(match))
        line = str(';'.join(match) + '\n')
        out.write(line)
    out.close()

これは、実際の計算を行うワーカー関数です (ここでは違いがないため、ほとんどのコードを削除しました。完全なソースはリポジトリにあります)。

def score_it(job_queue, writer_queue):
    '''Calculate scores for pair of words.'''
    pname = multiprocessing.current_process().name

    for pair in iter(job_queue.get_nowait, "STOP"):
        # do all the calculations and put the result into the writer queue
        writer_queue.put(result)

これは私がプロセスをセットアップする方法です:

# Files
to_match = open(args.file_to_match).readlines()
source_list = open(args.file_to_be_matched).readlines()

workers = 4
job_queue = multiprocessing.Manager().Queue()
writer_queue = multiprocessing.Manager().Queue()
processes = []

print('Start matching with "%s", minimum score of %s and %s workers') % (
    args.algorithm, minscore, workers)

# Fill up job queue
print("Filling up job queue with term pairs...")
c = multiprocessing.Process(target=combine, name="Feeder", args=(to_match, source_list))
c.start()
c.join()

print("Job queue size: %s") % job_queue.qsize()

# Start writer process
w = multiprocessing.Process(target=writer, name="Writer", args=(writer_queue,))
w.start()

for w in xrange(workers):
    p = multiprocessing.Process(target=score_it, args=(job_queue, writer_queue))
    p.start()
    processes.append(p)
    job_queue.put("STOP")

for p in processes:
    p.join()

writer_queue.put("STOP")

マルチプロセッシングが時々遅くなることについてここでかなり読んだことがありますが、これは新しいプロセスの作成と管理のオーバーヘッドに関係していることを知っています。また、実行するジョブが十分に「大きく」ない場合、マルチプロセッシングの効果が見えない場合があります。ただし、この場合、ジョブはかなり大きいと思います。また、Pool.map() バージョンは、はるかに高速であるため、それを証明しているようです。

これらすべてのプロセスを管理し、キュー オブジェクトを渡すときに、何か間違ったことをしているでしょうか? 実行中に必要なメモリの量を最小限に抑えるために、処理中に結果をファイルに書き込むことができるように、これを最適化するにはどうすればよいでしょうか?

ありがとう!

4

1 に答える 1

2

タイミングの問題は、マルチスレッド キューのバージョンに最適化が欠けていることだと思います。基本的に、ワーカー スレッドが job_queue からジョブを取得し始める前に job_queue がいっぱいになるというコメントを作成しました。これの理由は #Fill up job queue にある c.join() だと思います。これにより、ジョブ キューがいっぱいになるまでメイン スレッドが続行されなくなります。p.join() の後に c.join() を最後に移動します。また、停止フラグをキューの最後に配置する方法を見つける必要があります。結合関数は、これを配置するのに適した場所かもしれません。結合するデータがなくなった後、x 個の停止フラグを追加する行に沿ったもの。

注意すべきもう 1 つの点: p プロセスを開始する for ループのスコープ内で w 変数を上書きしています。スタイル/読みやすさなどの問題として、w を別の変数名に変更します。アンダースコアを使用しない場合は、使い捨ての変数名として有効です。いえ

for w in xrange(workers):

なるべき

for _ in xrange(workers):

簡単に言えば、 c.join() を最後に移動すると、より正確なタイミングが得られるはずです。現在、マルチスレッド化されているのは、文字列のあいまい一致だけです。プロデューサ/コンシューマ スレッドを使用する利点の 1 つは、コンシューマ スレッドがプロデューサ スレッドが終了するまで待機する必要がないため、最終的に使用するメモリが少なくなることです。

于 2014-12-10T16:12:22.873 に答える