9

誰かがLIFOに近い、またはFIFOに近い(たとえばランダムな)動作を取得するためのクリーンな方法を知っていmultiprocessing.Queueますか?

別の質問:誰かが、背後にある実際のストレージ構造を管理するスレッドのコードを教えてもらえmultiprocessing.Queueますか?ほぼLIFOアクセスを提供することはその中で些細なことのように思えますが、私はそれを見つけようとしてウサギの穴で迷子になりました。

ノート:

  1. multiprocessing.Queue 注文を保証するものではないと思います。罰金。しかし、それは近FIFOなので、近LIFOは素晴らしいでしょう。
  2. 現在のすべてのアイテムをキューから取り出して、それらを操作する前に順序を逆にすることもできますが、可能であれば、ごちゃごちゃしたものは避けたいと思います。

(編集)明確にするために:私はCPUバウンドシミュレーションを実行しmultiprocessingているため、からの特殊なキューを使用できませんQueue。数日間答えが見られなかったので、上記の別の質問を追加しました。


multiprocessing.Queueそれが問題である場合、以下はFIFOに近いわずかな証拠です。単純なケース(シングルスレッド)では、システム上で完全にFIFOであることを示しています。

import multiprocessing as mp
import Queue

q = mp.Queue()

for i in xrange(1000):
    q.put(i)

deltas = []
while True:
    try:
        value1 = q.get(timeout=0.1)
        value2 = q.get(timeout=0.1)
        deltas.append(value2-value1)
    except Queue.Empty:
        break

#positive deltas would indicate the numbers are coming out in increasing order
min_delta, max_delta = min(deltas), max(deltas)
avg_delta = sum(deltas)/len(deltas)

print "min", min_delta
print "max", max_delta
print "avg", avg_delta

印刷:最小、最大、および平均は正確に1(完全なFIFO)

4

2 に答える 2

3

Pythonインストールに存在するQueueクラスを調べましたLib/multiprocessing/queues.py(Python 2.7ですが、簡単に確認したPython 3.2とのバージョンに明らかな違いはありません)。これが私がそれが機能することを理解する方法です:

Queueオブジェクトによって維持されるオブジェクトの2つのセットがあります。1つのセットは、すべてのプロセスで共有されるマルチプロセスセーフなプリミティブです。その他は、各プロセスによって個別に作成および使用されます。

__init__クロスプロセスオブジェクトは、次のメソッドで設定されます。

  1. Pipe両端のオブジェクトは、およびとして保存されself._readerますself._writer
  2. キュー内のBoundedSemaphoreオブジェクトの数をカウント(およびオプションで制限)するオブジェクト。
  3. パイプを読み取るためのLockオブジェクト、およびWindows以外のプラットフォームでは書き込み用のオブジェクト。(これは、パイプへの書き込みがWindowsでは本質的にマルチプロセスセーフであるためだと思います。)

_after_forkプロセスごとのオブジェクトは、および_start_threadメソッドで設定されます。

  1. collections.dequeパイプへの書き込みをバッファリングするために使用されるオブジェクト。
  2. threading.conditionバッファが空でないときに信号を送るために使用されるオブジェクト。
  3. threading.Thread実際の書き込みを行うオブジェクト。遅延して作成されるため、特定のプロセスでキューへの書き込みが少なくとも1回要求されるまで存在しません。
  4. Finalizeプロセスが終了したときにものをクリーンアップするさまざまなオブジェクト。

キューからのAgetは非常に単純です。読み取りロックを取得し、セマフォをデクリメントして、パイプの読み取り端からオブジェクトを取得します。

Aputはもっと複雑です。複数のスレッドを使用します。呼び出し元putは条件のロックを取得し、そのオブジェクトをバッファーに追加して、ロックを解除する前に条件を通知します。また、セマフォをインクリメントし、まだ実行されていない場合はライタースレッドを起動します。

ライタースレッドは、_feedメソッド内で(キャンセルされるまで)永久にループします。バッファが空の場合、notempty条件を待機します。次に、バッファからアイテムを取得し、書き込みロック(存在する場合)を取得して、アイテムをパイプに書き込みます。


それで、それらすべてを考えると、LIFOキューを取得するように変更できますか?簡単ではないようです。パイプは本質的にFIFOオブジェクトであり、キューは全体的なFIFOの動作を保証できませんが(複数のプロセスからの書き込みの非同期性のため)、常にほとんどがFIFOになります。

コンシューマーが1つしかない場合はget、キューからオブジェクトを取得して、独自のプロセスローカルスタックに追加できます。共有メモリを使用すると、制限されたサイズのスタックはそれほど難しくはありませんが、マルチコンシューマスタックを実行するのは難しくなります。ロック、条件のペア(完全な状態と空の状態でのブロック/シグナリング用)、共有整数値(保持されている値の数用)、および適切なタイプの共有配列(値自体用)が必要です。

于 2012-08-21T19:48:17.963 に答える
1

QueueパッケージにはLIFOキューがあります(Python 3のキュー)これは、multiprocessingまたはmultiprocessing.queuesモジュールでは公開されません。

q = mp.Queue()行をプリントに置き換えてq = Queue.LifoQueue()実行します:最小、最大、平均は正確に-1です。

(また、1つのスレッドからのみアイテムを取得する場合は、常に正確にFIFO / LIFOの順序を取得する必要があると思います。)

于 2012-08-21T14:10:45.260 に答える