9

非同期の「ブロードキャスト スタイル」メッセージングを管理するための Python クラス (できれば、サード パーティのライブラリではなく、標準言語の一部) を探しています。

メッセージをキューに入れる 1 つのスレッド (「putMessageOnQueue」メソッドはブロックしてはなりません) と、おそらくいくつかのブロッキング「waitForMessage」関数を呼び出して、すべてメッセージを待機する他の複数のスレッドを作成します。メッセージがキューに配置されると、待機中の各スレッドがメッセージの独自のコピーを取得する必要があります。

組み込みQueueクラスを見てきましたが、メッセージを消費するにはキューからそれらを削除する必要があるように見えるため、これは適切ではないと思います。

これは一般的なユースケースであるように思われますが、誰かが解決策を推奨できますか?

4

2 に答える 2

8

これに対する典型的なアプローチは、スレッドごとに個別のメッセージキューを使用し、そのようなメッセージの受信に関心を以前に登録したすべてのキューにメッセージをプッシュすることだと思います。

このようなものは機能するはずですが、テストされていないコードです...

from time import sleep
from threading import Thread
from Queue import Queue

class DispatcherThread(Thread):

   def __init__(self, *args, **kwargs):
       super(DispatcherThread, self).__init__(*args, **kwargs)
       self.interested_threads = []

   def run(self):
       while 1:
           if some_condition:
               self.dispatch_message(some_message)
           else:
               sleep(0.1)

   def register_interest(self, thread):
       self.interested_threads.append(thread)

   def dispatch_message(self, message):
       for thread in self.interested_threads:
           thread.put_message(message)



class WorkerThread(Thread):

   def __init__(self, *args, **kwargs):
       super(WorkerThread, self).__init__(*args, **kwargs)
       self.queue = Queue()


   def run(self):

       # Tell the dispatcher thread we want messages
       dispatcher_thread.register_interest(self)

       while 1:
           # Wait for next message
           message = self.queue.get()

           # Process message
           # ...

   def put_message(self, message):
       self.queue.put(message)


dispatcher_thread = DispatcherThread()
dispatcher_thread.start()

worker_threads = []
for i in range(10):
    worker_thread = WorkerThread()
    worker_thread.start()
    worker_threads.append(worker_thread)

dispatcher_thread.join()
于 2013-05-31T14:43:09.140 に答える
2

これはより単純な例だと思います ( Python Libの Queue の例から引用)

from threading import Thread
from Queue import Queue


num_worker_threads = 2

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done
于 2015-09-08T11:41:18.957 に答える