8

これは、 Python3のマルチプロセッシングキューでの競合状態の回避に関する最近の質問の拡張です。うまくいけば、このバージョンの質問はより具体的です。

TL; DR:を使用してワーカープロセスがキューから供給されるマルチプロセッシングモデルでmultiprocessing.Queue、なぜ私のワーカープロセスはそれほどアイドル状態になっているのですか?各プロセスには独自の入力キューがあるため、共有キューのロックをめぐって互いに争うことはありませんが、キューは実際には空であるだけで多くの時間を費やします。メインプロセスはI/Oバウンドスレッドを実行しています-それは入力キューのCPUバウンドの充填を遅くしていますか?

特定の制約の下で、それぞれがM_i要素(0 <= i <Nの場合)を持つNセットのデカルト積の最大要素を見つけようとしています。デカルト積の要素は長さNのタプルであり、その要素はNセットの要素であることを思い出してください。これらのタプルを「組み合わせ」と呼び、元のセットのすべての組み合わせをループしているという事実を強調します。is_feasible関数がを返すとき、組み合わせは制約を満たしますTrue。私の問題では、要素の重みが最も大きい組み合わせを見つけようとしていますsum(element.weight for element in combination)

私の問題のサイズは大きいですが、私の会社のサーバーも大きいです。次のシリアルアルゴリズムを並列アルゴリズムとして書き直そうとしています。

from operator import itemgetter
from itertools import product # Cartesian product function from the std lib
def optimize(sets):
    """Return the largest (total-weight, combination) tuple from all
    possible combinations of the elements in the several sets, subject
    to the constraint that is_feasible(combo) returns True."""
    return max(
                map(
                    lambda combination: (
                        sum(element.weight for element in combination),
                        combination
                    ),
                    filter(
                        is_feasible, # Returns True if combo meets constraint
                        product(*sets)
                    )
                ),
                key=itemgetter(0) # Only maximize based on sum of weight
            )

私の現在のマルチプロセッシングアプローチは、ワーカープロセスを作成し、それらの組み合わせを入力キューでフィードすることです。労働者が毒薬を受け取ると、彼らは見た中で最高の組み合わせを出力キューに入れて終了します。メインプロセスのメインスレッドから入力キューを埋めます。この手法の利点の1つは、メインプロセスからセカンダリスレッドを生成して監視ツールを実行できることです(これまでに処理された組み合わせの数とキューの数を確認するために使用できるREPLだけです)。

                    +-----------+
            in_q0   |   worker0 |----\
            /-------+-----------+     \
+-----------+   in_q1   +-----------+  \ out_q  +-----------+
|   main    |-----------|   worker1 |-----------|   main    |
+-----------+           +-----------+  /        +-----------+
            \-------+-----------+     /
            in_q2   |   worker2 |----/
                    +-----------+

私は元々、すべてのワーカーが1つの入力キューから読み取っていましたが、いずれもCPUにヒットしていないことがわかりました。彼らがqueue.get()のブロックを解除するのを待つことにすべての時間を費やしていると考えて、私は彼らに独自のキューを与えました。それはCPUへのプレッシャーを増大させたので、私はワーカーがより頻繁にアクティブであると考えました。ただし、キューはほとんどの時間を空で過ごします。(私はこれを私が言及したモニタリングREPLから知っています)。これは、キューを埋めるメインループが遅いことを私に示唆しています。そのループは次のとおりです。

from itertools import cycle
main():
    # (Create workers, each with its own input queue)
    # Cycle through each worker's queue and add a combination to that queue
    for combo, worker in zip(product(*sets), cycle(workers)):
        worker.in_q.put(combo)
    # (Collect results and return)

ボトルネックはだと思いworker.in_q.put()ます。どうすればそれを速くできますか?私の最初の本能はワーカーを遅くすることでしたが、それは意味がありません...モニタースレッドがループを頻繁に停止しているという問題はありますか?どうすればわかりますか?

あるいは、ロックをそれほど待たずにこれを実装する別の方法はありますか?

4

1 に答える 1

4

あなたの要素はどのように見えますか?ピクルス化してキューに入れるのが遅い可能性があり、明らかにボトルネックになります。各要素が独立して何度も何度も酸洗いされていることに注意してください。

この場合、次のアプローチが役立つ場合があります。

  • カーディナリティ >= ワーカー数のセットを選択します。理想的には、それは労働者の数よりもはるかに多いでしょう。このセットを A と呼び、A のほぼ等しいサブセットを各ワーカーに割り当てます。そのサブセットを各ワーカーに送信します。
  • A 以外のすべてのセットの完全なコンテンツを各ワーカーに配布します (おそらく、pickle.dumps一度同じ文字列を各ワーカーに送信するか、共有メモリなどを介して)。
  • 次に、各ワーカーは、そのサブセットを実行するために必要な完全な情報を取得します。product(my_A_subset, *other_sets)各ジョブ間 (または 3 つのジョブごとなど) で何らかの停止信号をポーリングして (順序が異なる可能性があります)、陽気な方法で開始することができます。これはキューを介する必要はありません。1 ビットの共有メモリ値で問題ありません。
于 2012-05-20T01:19:02.547 に答える