Python で RabbitMQ にアクセスするためにpy-amqplibを使用しています。アプリケーションは、特定の MQ トピックをリッスンする要求を時々受け取ります。
このようなリクエストを初めて受信すると、AMQP 接続とチャネルを作成し、新しいスレッドを開始してメッセージをリッスンします。
connection = amqp.Connection(host = host, userid = "guest", password = "guest", virtual_host = "/", insist = False)
channel = connection.channel()
listener = AMQPListener(channel)
listener.start()
AMQPListenerは非常に単純です。
class AMQPListener(threading.Thread):
def __init__(self, channel):
threading.Thread.__init__(self)
self.__channel = channel
def run(self):
while True:
self.__channel.wait()
接続を作成した後、次のように、関心のあるトピックにサブスクライブします。
channel.queue_declare(queue = queueName, exclusive = False)
channel.exchange_declare(exchange = MQ_EXCHANGE_NAME, type = "direct", durable = False, auto_delete = True)
channel.queue_bind(queue = queueName, exchange = MQ_EXCHANGE_NAME, routing_key = destination)
def receive_callback(msg):
self.queue.put(msg.body)
channel.basic_consume(queue = queueName, no_ack = True, callback = receive_callback)
これで初めて、すべて正常に動作します。ただし、後続の別のトピックへのサブスクライブ要求では失敗します。後続のリクエストでは、AMQP 接続と AMQPListener スレッドを再利用し (トピックごとに新しいスレッドを開始したくないため)、channel.queue_declare()メソッド呼び出しの上のコード ブロックを呼び出しても戻りません。また、その時点で新しいチャネルを作成しようとしましたが、connection.channel()呼び出しも返されません。
私がそれを機能させることができた唯一の方法は、トピックごとに新しい接続、チャネル、およびリスナー スレッド (つまり、routing_key) を作成することですが、これは実際には理想的ではありません。どういうわけか接続全体をブロックしているのは wait() メソッドだと思いますが、どうすればよいかわかりません。単一のリスナー スレッドを使用して、複数のルーティング キー (または複数のチャネル) を持つメッセージを受信できるはずですか?
関連する質問:トピックが不要になった場合、リスナー スレッドを停止するにはどうすればよいですか? メッセージがない場合、 channel.wait() 呼び出しは永久にブロックされているように見えます。私が考えることができる唯一の方法は、ダミーメッセージをキューに送信して「毒」することです。リスナーによって停止の合図として解釈されます。