最初に を定義しますQueue
:
from Queue import Queue
q = Queue()
次に、スレッドで、そのキューからアイテムを取得しようとします。
msg = q.get()
これにより、キューに何かが見つかるまでスレッド全体がブロックされます。
同時に、着信イベントがコールバックをトリガーすることによって通知されると仮定すると、受信した RabbitMQ メッセージをキューに入れるだけのコールバックを登録します。
def on_message(msg):
q.put(msg)
rabbitmq_channel.register_callback(on_message)
または、短いコードが好きな場合:
rabbitmq_channel.register_callback(lambda msg: q.put(msg))
(上記は、RabbitMQ も、RabbitMQ 用の Python バインディングも使用していないため、疑似コードですが、スニペットを実際のアプリケーション コードに適合させる方法を簡単に理解できるはずです。注意を払うべき重要な部分は、q.put(msg)
-just make新しいメッセージが通知されるとすぐにその部分が呼び出されることを確認してください。)
これが発生するとすぐに、スレッドが起動され、メッセージを自由に処理できるようになります。複数のメッセージに同じスレッドを再利用するには、while
ループを使用します。
while True:
msg = q.get()
process_message(msg)
PS Gevent を調べて、それを Python アプリケーションで RabbitMQ と組み合わせる方法をお勧めします。これにより、スレッドを削除し、代わりにスレッドプールを管理することなく、より軽量でスケーラブルなグリーン スレッド メカニズムを使用できるようになります (その場で何万ものグリーンレットを生成して殺します):
# this thing always called in a green thread; forget about pools and queues.
def on_message(msg):
# you're in a green thread now; just process away!
benefit_from("all the gevent goodness!")
spawn_and_join_10_sub_greenlets()
rabbitmq_channel.register_callback(lambda msg: gevent.spawn(on_message, msg))