3

Python で RabbitMQ にアクセスするためにpy-amqplibを使用しています。アプリケーションは、特定の MQ トピックをリッスンする要求を時々受け取ります。

このようなリクエストを初めて受信すると、AMQP 接続とチャネルを作成し、新しいスレッドを開始してメッセージをリッスンします。

    connection = amqp.Connection(host = host, userid = "guest", password = "guest", virtual_host = "/", insist = False)
    channel = connection.channel()

    listener = AMQPListener(channel)
    listener.start()

AMQPListenerは非常に単純です。

class AMQPListener(threading.Thread):
    def __init__(self, channel):
        threading.Thread.__init__(self)
        self.__channel = channel

    def run(self):
        while True:
            self.__channel.wait()

接続を作成した後、次のように、関心のあるトピックにサブスクライブします。

channel.queue_declare(queue = queueName, exclusive = False)
channel.exchange_declare(exchange = MQ_EXCHANGE_NAME, type = "direct", durable = False, auto_delete = True)
channel.queue_bind(queue = queueName, exchange = MQ_EXCHANGE_NAME, routing_key = destination)

def receive_callback(msg):
    self.queue.put(msg.body)

channel.basic_consume(queue = queueName, no_ack = True, callback = receive_callback)

これで初めて、すべて正常に動作します。ただし、後続の別のトピックへのサブスクライブ要求では失敗します。後続のリクエストでは、AMQP 接続と AMQPListener スレッドを再利用し (トピックごとに新しいスレッドを開始したくないため)、channel.queue_declare()メソッド呼び出しの上のコード ブロックを呼び出しても戻りません。また、その時点で新しいチャネルを作成しようとしましたが、connection.channel()呼び出しも返されません。

私がそれを機能させることができた唯一の方法は、トピックごとに新しい接続、チャネル、およびリスナー スレッド (つまり、routing_key) を作成することですが、これは実際には理想的ではありません。どういうわけか接続全体をブロックしているのは wait() メソッドだと思いますが、どうすればよいかわかりません。単一のリスナー スレッドを使用して、複数のルーティング キー (または複数のチャネル) を持つメッセージを受信できるはずですか?

関連する質問:トピックが不要になった場合、リスナー スレッドを停止するにはどうすればよいですか? メッセージがない場合、 channel.wait() 呼び出しは永久にブロックされているように見えます私が考えることができる唯一の方法は、ダミーメッセージをキューに送信して「毒」することです。リスナーによって停止の合図として解釈されます。

4

1 に答える 1

1

チャネルごとに複数の消費者が必要な場合は、basic_consume()を使用して別の消費者を接続し、後でchannel.wait()を使用します。basic_consume()を介して接続されたすべてのキューをリッスンします。basic_consume()ごとに異なるコンシューマータグを定義してください。

キュー内の特定のコンシューマーをキャンセルする場合(特定のトピックのリッスンをキャンセルする場合)は、 channel.basic_cancel(consumer_tag)を使用します。

于 2010-01-07T10:10:47.747 に答える