私は RabbitMQ 3.5.1 をrabbit_presence_exchange (バイナリ配布) およびrabbitmq_event_exchange (この問題のデバッグに役立てるため) プラグインと Python Pika クライアントと共に使用しています。
プレゼンス プラグインは、新しい交換タイプx-presenceを提供することで機能します。これにルーティング キーを使用してキューをバインドすると、キューがバインドおよびアンバインドされるときにプレゼンス通知が生成されます (たとえば、ルーティング キーがユーザー名である場合)。ルーティング キーなしでキューをバインドすると、プレゼンス通知を受信するようにサインアップされます。
これで問題ありません。このようなプレゼンス通知を正常に生成して受信できます。しかし、ここでは交換を介してプレゼンス メッセージをルーティングしたいと考えています。最初に、ヘッダー交換を使用しようとしましたが、メッセージが来ていませんでした。そのため、ファンアウト交換に変更しました (ヘッダー マッチングを正しく設定しなかった場合に備えて) が、まだ何も来ていません。 .
これは、追加の交換なしでプレゼンス メッセージを生成および受信するための私のスクリプトです (つまり、これが機能します)。
#!/usr/bin/env python3
import pika
import names
MY_NAME = names.get_first_name()
PRESENCE_EXCHANGE = 'presence'
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange=PRESENCE_EXCHANGE,
exchange_type='x-presence')
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
print('My name is %s and my queue is %r' % (MY_NAME, queue_name))
channel.queue_bind(exchange=PRESENCE_EXCHANGE,
queue=queue_name,
routing_key=MY_NAME)
channel.queue_bind(exchange=PRESENCE_EXCHANGE,
queue=queue_name,
routing_key='')
def on_message(ch, method, properties, body):
print(method, '\n', properties, '\n', body)
exchange = method.exchange
if exchange == PRESENCE_EXCHANGE:
action = properties.headers['action']
who = properties.headers['key']
if action == 'bind':
print(' [+] %s has come online.' % (who,))
elif action == 'unbind':
print(' [-] %s has gone offline.' % (who,))
channel.basic_consume(queue=queue_name,
on_message_callback=on_message,
auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
try:
channel.start_consuming()
except KeyboardInterrupt:
pass
finally:
connection.close()
上記を変更して、プレゼンス メッセージを組み込みのファンアウト交換にルーティングし、キューをそれにバインドしました。
...
print('My name is %s and my queue is %r' % (MY_NAME, queue_name))
channel.queue_bind(exchange=PRESENCE_EXCHANGE,
queue=queue_name,
routing_key=MY_NAME)
channel.exchange_bind(source=PRESENCE_EXCHANGE,
destination='amq.fanout',
routing_key='')
channel.queue_bind(exchange='amq.fanout',
queue=queue_name)
def on_message(ch, method, properties, body):
...
取引所がメッセージを受信していない理由について困惑しています。Erlang は私の言語の 1 つではありません。そのため、Presence プラグインのソースを読み取って、これがサポートされているかどうかを判断するのに苦労しています (サポートされない理由はわかりませんが)。
アイデア (または、RabbitMQ でプレゼンスを処理するためのより良い方法) がある場合は、ぜひお聞かせください。
編集:
このコードと 2 つのクライアントが実行されていると、交換とバインディングは次のようになります。
Listing exchanges ...
direct
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.event topic
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic
presence x-presence
Listing bindings ...
exchange amq.gen-6aU7qS-ikR4cLmxcT6VKDQ queue amq.gen-6aU7qS-ikR4cLmxcT6VKDQ []
exchange amq.gen-MiyEpW9VIxD49PE9SqATFA queue amq.gen-MiyEpW9VIxD49PE9SqATFA []
amq.fanout exchange amq.gen-6aU7qS-ikR4cLmxcT6VKDQ queue amq.gen-6aU7qS-ikR4cLmxcT6VKDQ []
amq.fanout exchange amq.gen-MiyEpW9VIxD49PE9SqATFA queue amq.gen-MiyEpW9VIxD49PE9SqATFA []
presence exchange amq.fanout exchange []
presence exchange amq.gen-6aU7qS-ikR4cLmxcT6VKDQ queue Sheila []
presence exchange amq.gen-MiyEpW9VIxD49PE9SqATFA queue Joaquin []