12

約61億(カスタム)のアイテムの最大重量を見つけようとしていますが、これを並列処理で実行したいと思います。私の特定のアプリケーションには、61億を超えるアイテムを繰り返す必要のない、より優れたアルゴリズムがありますが、それらを説明する教科書は私の頭上にあり、上司はこれを4日で完了することを望んでいます。私は自分の会社の豪華なサーバーと並列処理でより良いショットを持っていると思いました。ただし、並列処理について私が知っていることはすべて、Pythonの ドキュメントを読むことから得られます。つまり、私はかなり迷っています...

私の現在の理論は、フィーダープロセス、入力キュー、ワーカープロセスの全体(たとえば30)、および出力キューを設定することです(出力キューで最大の要素を見つけるのは簡単です)。私が理解していないのは、フィーダープロセスが、アイテムが入力キューを通過するのを待つのをいつ停止するかをワーカープロセスにどのように伝えることができるかということです。

multiprocessing.Pool.map_async反復可能な6.1E9アイテムで使用することを考えていましたが、何もせずにアイテムを反復するだけで10分近くかかります。私が何かを誤解していない限りmap_async、プロセスが作業を開始している間に、それらを繰り返し処理してプロセスに割り当てることができます。(Poolも提供しますimapが、ドキュメントmapには、非同期で動作するようには見えないに似ていると記載されています。非同期が必要ですよね?

関連する質問concurrent.futures代わりに使用しmultiprocessingますか?私は2キューシステムを実装する最初の人になることはできませんでした(それはまさにアメリカのすべてのデリのラインが機能する方法です...)それでこれを行うためのより多くのPythonic /組み込みの方法はありますか?

これが私がやろうとしていることの骨組みです。中央のコメントブロックを参照してください。

import multiprocessing as mp
import queue

def faucet(items, bathtub):
    """Fill bathtub, a process-safe queue, with 6.1e9 items"""
    for item in items:
        bathtub.put(item)
    bathtub.close()

def drain_filter(bathtub, drain):
    """Put maximal item from bathtub into drain.
    Bathtub and drain are process-safe queues.
    """
    max_weight = 0
    max_item = None
    while True:
        try:
            current_item = bathtub.get()
        # The following line three lines are the ones that I can't
        # quite figure out how to trigger without a race condition.
        # What I would love is to trigger them AFTER faucet calls
        # bathtub.close and the bathtub queue is empty.
        except queue.Empty:
            drain.put((max_weight, max_item))
            return
        else:
            bathtub.task_done()
        if not item.is_relevant():
            continue
        current_weight = item.weight
        if current_weight > max_weight:
            max_weight = current_weight
            max_item = current_item

def parallel_max(items, nprocs=30):
    """The elements of items should have a method `is_relevant`
    and an attribute `weight`. `items` itself is an immutable
    iterator object.
    """
    bathtub_q = mp.JoinableQueue()
    drain_q = mp.Queue()

    faucet_proc = mp.Process(target=faucet, args=(items, bathtub_q))
    worker_procs = mp.Pool(processes=nprocs)

    faucet_proc.start()
    worker_procs.apply_async(drain_filter, bathtub_q, drain_q)

    finalists = []
    for i in range(nprocs):
        finalists.append(drain_q.get())

    return max(finalists)


ここに答えがあります

私の質問に対する非常に徹底的な答えと、PythonFoundationのコミュニケーションディレクターであるDougHellmanによるマルチタスクの穏やかな紹介を見つけました。私が欲しかったのは「ポイズンピル」のパターンでした。ここでそれをチェックしてください:http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html

その概念のカーネルを投稿するための@MRABへの小道具。

4

1 に答える 1

6

Noneなどの特別な終了アイテムをキューに入れることができます。作業者がそれを見ると、他の作業者が見ることができるように元に戻して、終了することができます。または、ワーカーごとに1つの特別な終了アイテムをキューに入れることもできます。

于 2012-05-15T19:59:27.610 に答える