両方が同じシステムに存在する2 つの (マルチプロセッシング) Queuesのいずれかで何かが利用可能になるまで (回転せずに) 待機する最良の方法は何ですか?
9 に答える
実際には、select.select で multiprocessing.Queue オブジェクトを使用できます。すなわち
que = multiprocessing.Queue()
(input,[],[]) = select.select([que._reader],[],[])
読み取る準備ができている場合にのみ que を選択します。
ただし、それに関するドキュメントはありません。multiprocessing.queue ライブラリ (Linux では、通常は /usr/lib/python2.6/multiprocessing/queue.py のような sth です) のソース コードを読んで見つけました。
Queue.Queue では、これを行うスマートな方法を見つけられませんでした (そして、私は本当にやりたいと思っています)。
これを処理する公式の方法はまだないようです。または、少なくとも、これに基づいていない:
この投稿が行っているようなことを試すことができます-基になるパイプファイルハンドルにアクセスします:
次に、selectを使用します。
マルチプロセッシング キューでの選択が Windows でどの程度うまく機能するかはわかりません。Windows の select はファイル ハンドルではなくソケットをリッスンするため、問題が発生する可能性があると思われます。
私の答えは、ブロッキング方式で各キューをリッスンするスレッドを作成し、結果をすべてメイン スレッドがリッスンする単一のキューに入れ、本質的に個々のキューを単一のキューに多重化することです。
これを行うための私のコードは次のとおりです。
"""
Allow multiple queues to be waited upon.
queue,value = multiq.select(list_of_queues)
"""
import queue
import threading
class queue_reader(threading.Thread):
def __init__(self,inq,sharedq):
threading.Thread.__init__(self)
self.inq = inq
self.sharedq = sharedq
def run(self):
while True:
data = self.inq.get()
print ("thread reads data=",data)
result = (self.inq,data)
self.sharedq.put(result)
class multi_queue(queue.Queue):
def __init__(self,list_of_queues):
queue.Queue.__init__(self)
for q in list_of_queues:
qr = queue_reader(q,self)
qr.start()
def select(list_of_queues):
outq = queue.Queue()
for q in list_of_queues:
qr = queue_reader(q,outq)
qr.start()
return outq.get()
次のテスト ルーチンは、その使用方法を示しています。
import multiq
import queue
q1 = queue.Queue()
q2 = queue.Queue()
q3 = multiq.multi_queue([q1,q2])
q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)
res=0
while not res==4:
while not q3.empty():
res = q3.get()[1]
print ("returning result =",res)
お役に立てれば。
トニー・ウォレス
プラットフォームに依存しない方法でマルチプロセッシングを使用する場合、着信アイテムを単一の Queue に転送してから待機するスレッドを使用することが実用的な選択肢のようです。
スレッドを回避するには、低レベルのパイプ/FD を処理する必要があります。これは、プラットフォーム固有であり、高レベルの API で一貫して処理するのは容易ではありません。
または、コールバックを設定する機能を備えたキューが必要になります。これは、適切な高レベルのインターフェイスであると考えられます。つまり、次のように記述します。
シングルキュー = キュー() incoming_queue1.setcallback(singlequeue.put) incoming_queue2.setcallback(singlequeue.put) ... singlequeue.get()
おそらくマルチプロセッシング パッケージがこの API を拡張する可能性がありますが、まだそこにはありません。この概念は、「キュー」の代わりに「チャネル」という用語を使用する py.execnet でうまく機能します。こちらを参照してくださいhttp://tinyurl.com/nmtr4w
上記のコードの新しいバージョン...
マルチプロセッシング キューでの選択が Windows でどの程度うまく機能するかはわかりません。Windows の select はファイル ハンドルではなくソケットをリッスンするため、問題が発生する可能性があると思われます。
私の答えは、ブロッキング方式で各キューをリッスンするスレッドを作成し、結果をすべてメイン スレッドがリッスンする単一のキューに入れ、本質的に個々のキューを単一のキューに多重化することです。
これを行うための私のコードは次のとおりです。
"""
Allow multiple queues to be waited upon.
An EndOfQueueMarker marks a queue as
"all data sent on this queue".
When this marker has been accessed on
all input threads, this marker is returned
by the multi_queue.
"""
import queue
import threading
class EndOfQueueMarker:
def __str___(self):
return "End of data marker"
pass
class queue_reader(threading.Thread):
def __init__(self,inq,sharedq):
threading.Thread.__init__(self)
self.inq = inq
self.sharedq = sharedq
def run(self):
q_run = True
while q_run:
data = self.inq.get()
result = (self.inq,data)
self.sharedq.put(result)
if data is EndOfQueueMarker:
q_run = False
class multi_queue(queue.Queue):
def __init__(self,list_of_queues):
queue.Queue.__init__(self)
self.qList = list_of_queues
self.qrList = []
for q in list_of_queues:
qr = queue_reader(q,self)
qr.start()
self.qrList.append(qr)
def get(self,blocking=True,timeout=None):
res = []
while len(res)==0:
if len(self.qList)==0:
res = (self,EndOfQueueMarker)
else:
res = queue.Queue.get(self,blocking,timeout)
if res[1] is EndOfQueueMarker:
self.qList.remove(res[0])
res = []
return res
def join(self):
for qr in self.qrList:
qr.join()
def select(list_of_queues):
outq = queue.Queue()
for q in list_of_queues:
qr = queue_reader(q,outq)
qr.start()
return outq.get()
次のコードは、それがどのように機能するかを示すための私のテスト ルーチンです。
import multiq
import queue
q1 = queue.Queue()
q2 = queue.Queue()
q3 = multiq.multi_queue([q1,q2])
q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)
q1.put(multiq.EndOfQueueMarker)
q2.put(multiq.EndOfQueueMarker)
res=0
have_data = True
while have_data:
res = q3.get()[1]
print ("returning result =",res)
have_data = not(res==multiq.EndOfQueueMarker)
Observerパターンのようなものを使用できます。このパターンでは、Queue サブスクライバーに状態の変化が通知されます。
この場合、ワーカー スレッドを各キューのリスナーとして指定することができます。これにより、ready シグナルを受信するたびに新しい項目で動作し、それ以外の場合はスリープ状態になります。
やらないでください。
メッセージにヘッダーを付けて、共通のキューに送信します。これにより、コードが簡素化され、全体的にきれいになります。