約61億(カスタム)のアイテムの最大重量を見つけようとしていますが、これを並列処理で実行したいと思います。私の特定のアプリケーションには、61億を超えるアイテムを繰り返す必要のない、より優れたアルゴリズムがありますが、それらを説明する教科書は私の頭上にあり、上司はこれを4日で完了することを望んでいます。私は自分の会社の豪華なサーバーと並列処理でより良いショットを持っていると思いました。ただし、並列処理について私が知っていることはすべて、Pythonの ドキュメントを読むことから得られます。つまり、私はかなり迷っています...
私の現在の理論は、フィーダープロセス、入力キュー、ワーカープロセスの全体(たとえば30)、および出力キューを設定することです(出力キューで最大の要素を見つけるのは簡単です)。私が理解していないのは、フィーダープロセスが、アイテムが入力キューを通過するのを待つのをいつ停止するかをワーカープロセスにどのように伝えることができるかということです。
multiprocessing.Pool.map_async
反復可能な6.1E9アイテムで使用することを考えていましたが、何もせずにアイテムを反復するだけで10分近くかかります。私が何かを誤解していない限りmap_async
、プロセスが作業を開始している間に、それらを繰り返し処理してプロセスに割り当てることができます。(Pool
も提供しますimap
が、ドキュメントmap
には、非同期で動作するようには見えないに似ていると記載されています。非同期が必要ですよね?)
関連する質問:concurrent.futures
代わりに使用しmultiprocessing
ますか?私は2キューシステムを実装する最初の人になることはできませんでした(それはまさにアメリカのすべてのデリのラインが機能する方法です...)それでこれを行うためのより多くのPythonic /組み込みの方法はありますか?
これが私がやろうとしていることの骨組みです。中央のコメントブロックを参照してください。
import multiprocessing as mp
import queue
def faucet(items, bathtub):
"""Fill bathtub, a process-safe queue, with 6.1e9 items"""
for item in items:
bathtub.put(item)
bathtub.close()
def drain_filter(bathtub, drain):
"""Put maximal item from bathtub into drain.
Bathtub and drain are process-safe queues.
"""
max_weight = 0
max_item = None
while True:
try:
current_item = bathtub.get()
# The following line three lines are the ones that I can't
# quite figure out how to trigger without a race condition.
# What I would love is to trigger them AFTER faucet calls
# bathtub.close and the bathtub queue is empty.
except queue.Empty:
drain.put((max_weight, max_item))
return
else:
bathtub.task_done()
if not item.is_relevant():
continue
current_weight = item.weight
if current_weight > max_weight:
max_weight = current_weight
max_item = current_item
def parallel_max(items, nprocs=30):
"""The elements of items should have a method `is_relevant`
and an attribute `weight`. `items` itself is an immutable
iterator object.
"""
bathtub_q = mp.JoinableQueue()
drain_q = mp.Queue()
faucet_proc = mp.Process(target=faucet, args=(items, bathtub_q))
worker_procs = mp.Pool(processes=nprocs)
faucet_proc.start()
worker_procs.apply_async(drain_filter, bathtub_q, drain_q)
finalists = []
for i in range(nprocs):
finalists.append(drain_q.get())
return max(finalists)
ここに答えがあります
私の質問に対する非常に徹底的な答えと、PythonFoundationのコミュニケーションディレクターであるDougHellmanによるマルチタスクの穏やかな紹介を見つけました。私が欲しかったのは「ポイズンピル」のパターンでした。ここでそれをチェックしてください:http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html
その概念のカーネルを投稿するための@MRABへの小道具。