0

私は RabbitMQ 3.5.1 をrabbit_presence_exchange (バイナリ配布) およびrabbitmq_event_exchange (この問題のデバッグに役立てるため) プラグインと Python Pika クライアントと共に使用しています。

プレゼンス プラグインは、新しい交換タイプx-presenceを提供することで機能します。これにルーティング キーを使用してキューをバインドすると、キューがバインドおよびアンバインドされるときにプレゼンス通知が生成されます (たとえば、ルーティング キーがユーザー名である場合)。ルーティング キーなしでキューをバインドすると、プレゼンス通知を受信するようにサインアップされます。

これで問題ありません。このようなプレゼンス通知を正常に生成して受信できます。しかし、ここでは交換を介してプレゼンス メッセージをルーティングしたいと考えています。最初に、ヘッダー交換を使用しようとしましたが、メッセージが来ていませんでした。そのため、ファンアウト交換に変更しました (ヘッダー マッチングを正しく設定しなかった場合に備えて) が、まだ何も来ていません。 .

これは、追加の交換なしでプレゼンス メッセージを生成および受信するための私のスクリプトです (つまり、これが機能します)。

#!/usr/bin/env python3
import pika
import names

MY_NAME = names.get_first_name()
PRESENCE_EXCHANGE = 'presence'

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange=PRESENCE_EXCHANGE,
                         exchange_type='x-presence')

result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue

print('My name is %s and my queue is %r' % (MY_NAME, queue_name))

channel.queue_bind(exchange=PRESENCE_EXCHANGE,
                   queue=queue_name,
                   routing_key=MY_NAME)

channel.queue_bind(exchange=PRESENCE_EXCHANGE,
                   queue=queue_name,
                   routing_key='')


def on_message(ch, method, properties, body):
    print(method, '\n', properties, '\n', body)
    exchange = method.exchange
    if exchange == PRESENCE_EXCHANGE:
        action = properties.headers['action']
        who = properties.headers['key']
        if action == 'bind':
            print(' [+] %s has come online.' % (who,))
        elif action == 'unbind':
            print(' [-] %s has gone offline.' % (who,))


channel.basic_consume(queue=queue_name,
                      on_message_callback=on_message,
                      auto_ack=True)


print(' [*] Waiting for messages. To exit press CTRL+C')
try:
    channel.start_consuming()
except KeyboardInterrupt:
    pass
finally:
    connection.close()

上記を変更して、プレゼンス メッセージを組み込みのファンアウト交換にルーティングし、キューをそれにバインドしました。

...
print('My name is %s and my queue is %r' % (MY_NAME, queue_name))

channel.queue_bind(exchange=PRESENCE_EXCHANGE,
                   queue=queue_name,
                   routing_key=MY_NAME)

channel.exchange_bind(source=PRESENCE_EXCHANGE,
                      destination='amq.fanout',
                      routing_key='')

channel.queue_bind(exchange='amq.fanout',
                   queue=queue_name)


def on_message(ch, method, properties, body):
...

取引所がメッセージを受信して​​いない理由について困惑しています。Erlang は私の言語の 1 つではありません。そのため、Presence プラグインのソースを読み取って、これがサポートされているかどうかを判断するのに苦労しています (サポートされない理由はわかりませんが)。

アイデア (または、RabbitMQ でプレゼンスを処理するためのより良い方法) がある場合は、ぜひお聞かせください。

編集:
このコードと 2 つのクライアントが実行されていると、交換とバインディングは次のようになります。

Listing exchanges ...
    direct
amq.direct  direct
amq.fanout  fanout
amq.headers headers
amq.match   headers
amq.rabbitmq.event  topic
amq.rabbitmq.log    topic
amq.rabbitmq.trace  topic
amq.topic   topic
presence    x-presence

Listing bindings ...
    exchange    amq.gen-6aU7qS-ikR4cLmxcT6VKDQ  queue   amq.gen-6aU7qS-ikR4cLmxcT6VKDQ  []
    exchange    amq.gen-MiyEpW9VIxD49PE9SqATFA  queue   amq.gen-MiyEpW9VIxD49PE9SqATFA  []
amq.fanout  exchange    amq.gen-6aU7qS-ikR4cLmxcT6VKDQ  queue   amq.gen-6aU7qS-ikR4cLmxcT6VKDQ  []
amq.fanout  exchange    amq.gen-MiyEpW9VIxD49PE9SqATFA  queue   amq.gen-MiyEpW9VIxD49PE9SqATFA  []
presence    exchange    amq.fanout  exchange        []
presence    exchange    amq.gen-6aU7qS-ikR4cLmxcT6VKDQ  queue   Sheila  []
presence    exchange    amq.gen-MiyEpW9VIxD49PE9SqATFA  queue   Joaquin []
4

1 に答える 1

0

これと同じ質問を rabbitmq-users メーリング リストに投稿したところ、次のような回答がありました。

プラグインは、通常の (queue-to-exchange) バインディングで動作することを暗黙的に想定しています [3]。

RabbitMQ 3.5.x は、少なくとも 2 年間サポートされていません。[1][2] をアップグレードしてください。

  1. https://www.rabbitmq.com/upgrade.html
  2. https://www.rabbitmq.com/changelog.html
  3. https://github.com/rabbitmq/presence-exchange/blob/master/src/presence_exchange.erl#L72
于 2019-05-14T08:25:49.463 に答える