これは、 Python3のマルチプロセッシングキューでの競合状態の回避に関する最近の質問の拡張です。うまくいけば、このバージョンの質問はより具体的です。
TL; DR:を使用してワーカープロセスがキューから供給されるマルチプロセッシングモデルでmultiprocessing.Queue
、なぜ私のワーカープロセスはそれほどアイドル状態になっているのですか?各プロセスには独自の入力キューがあるため、共有キューのロックをめぐって互いに争うことはありませんが、キューは実際には空であるだけで多くの時間を費やします。メインプロセスはI/Oバウンドスレッドを実行しています-それは入力キューのCPUバウンドの充填を遅くしていますか?
特定の制約の下で、それぞれがM_i要素(0 <= i <Nの場合)を持つNセットのデカルト積の最大要素を見つけようとしています。デカルト積の要素は長さNのタプルであり、その要素はNセットの要素であることを思い出してください。これらのタプルを「組み合わせ」と呼び、元のセットのすべての組み合わせをループしているという事実を強調します。is_feasible
関数がを返すとき、組み合わせは制約を満たしますTrue
。私の問題では、要素の重みが最も大きい組み合わせを見つけようとしていますsum(element.weight for element in combination)
。
私の問題のサイズは大きいですが、私の会社のサーバーも大きいです。次のシリアルアルゴリズムを並列アルゴリズムとして書き直そうとしています。
from operator import itemgetter
from itertools import product # Cartesian product function from the std lib
def optimize(sets):
"""Return the largest (total-weight, combination) tuple from all
possible combinations of the elements in the several sets, subject
to the constraint that is_feasible(combo) returns True."""
return max(
map(
lambda combination: (
sum(element.weight for element in combination),
combination
),
filter(
is_feasible, # Returns True if combo meets constraint
product(*sets)
)
),
key=itemgetter(0) # Only maximize based on sum of weight
)
私の現在のマルチプロセッシングアプローチは、ワーカープロセスを作成し、それらの組み合わせを入力キューでフィードすることです。労働者が毒薬を受け取ると、彼らは見た中で最高の組み合わせを出力キューに入れて終了します。メインプロセスのメインスレッドから入力キューを埋めます。この手法の利点の1つは、メインプロセスからセカンダリスレッドを生成して監視ツールを実行できることです(これまでに処理された組み合わせの数とキューの数を確認するために使用できるREPLだけです)。
+-----------+
in_q0 | worker0 |----\
/-------+-----------+ \
+-----------+ in_q1 +-----------+ \ out_q +-----------+
| main |-----------| worker1 |-----------| main |
+-----------+ +-----------+ / +-----------+
\-------+-----------+ /
in_q2 | worker2 |----/
+-----------+
私は元々、すべてのワーカーが1つの入力キューから読み取っていましたが、いずれもCPUにヒットしていないことがわかりました。彼らがqueue.get()のブロックを解除するのを待つことにすべての時間を費やしていると考えて、私は彼らに独自のキューを与えました。それはCPUへのプレッシャーを増大させたので、私はワーカーがより頻繁にアクティブであると考えました。ただし、キューはほとんどの時間を空で過ごします。(私はこれを私が言及したモニタリングREPLから知っています)。これは、キューを埋めるメインループが遅いことを私に示唆しています。そのループは次のとおりです。
from itertools import cycle
main():
# (Create workers, each with its own input queue)
# Cycle through each worker's queue and add a combination to that queue
for combo, worker in zip(product(*sets), cycle(workers)):
worker.in_q.put(combo)
# (Collect results and return)
ボトルネックはだと思いworker.in_q.put()
ます。どうすればそれを速くできますか?私の最初の本能はワーカーを遅くすることでしたが、それは意味がありません...モニタースレッドがループを頻繁に停止しているという問題はありますか?どうすればわかりますか?
あるいは、ロックをそれほど待たずにこれを実装する別の方法はありますか?