1

AMQP にメッセージを送信するときに、交換が存在しないかどうかを検出できるようにしたいと考えています。

次の例を検討してください。

#!/usr/bin/python

import amqp
from time import sleep

conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/")
outgoing = conn.channel()
message = amqp.Message("x")


while True:
    print "publish message."
    outgoing.basic_publish(message,exchange="non-existing",routing_key="fubar")
    sleep(1)

このスクリプトは取引所への公開を続けますが、取引所が存在しない場合でもエラーは発生しません。交換が存在する場合、メッセージが到着します。

#!/usr/bin/python

import amqp
from time import sleep

conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/")
outgoing = conn.channel()
message = amqp.Message("x")


while True:
    print "publish message."
    outgoing.basic_publish(message,exchange="non-existing",routing_key="fubar")
    outgoing.wait()
    sleep(1)

私が Outing.wait() を追加すると、 amqp.exceptions.NotFound が発生します。これは私が望むものです。ただし、問題は、この場合に交換が存在する場合、メッセージは到着しますが、outinging.wait() がループをブロックすることです。(別のスレッドで outside.wait() を実行することもできますが、実行したくありません。)

これに対処する方法は?

アドバイスのヒントポインタは大歓迎です

ありがとう、

ジェイ

4

2 に答える 2

4

交換が存在するかどうかを調べたい場合は、exchange_declare メソッドを使用して、パッシブ フラグを True に設定します。パッシブ フラグを True に設定すると、サーバーが交換を作成しようとするのを防ぎ、交換が存在しない場合は代わりにエラーをスローします。

import amqp
from amqp.exceptions import NotFound

conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest",
                       virtual_host="/")
outgoing = conn.channel()
try:
    outgoing.exchange_declare("fubar", "", passive=True)
except NotFound:
    print "Exchange 'fubar' does not exist!"

公開する前に交換が存在することを確認することに本当に関心がある場合は、送信ループに入る前に宣言するだけです。交換がすでに存在する場合、何も起こりません。交換が存在しない場合は、作成されます。

import amqp

conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest",
                       virtual_host="/")
outgoing = conn.channel()
outgoing.exchange_declare("fubar", "direct")

使用している amqp ライブラリの exchange_declare のメソッド宣言へのリンクは次のとおりです。

https://github.com/celery/py-amqp/blob/master/amqp/channel.py#L460-L461

于 2013-03-01T18:54:50.700 に答える
1

残念ながら、basic_publish() からの例外をチェックするにはブロッキング呼び出しが必要です。ただし、できることは、非同期ループに入る前にブロッキング呼び出しを 1 回実行することです。

# send a test message synchronously to see if the exchange exists
test_message = amqp.Message('exchange_test')
outgoing.basic_publish(test_message,exchange="non-existing",routing_key="fubar")
try:    
    outgoing.wait()
except amqp.exceptions.NotFound:
    # could not find the exchange, so do something about it
    exit()

while True:
    # fairly certain the exchange exists now, run the async loop
    print "publish message."
    outgoing.basic_publish(message,exchange="non-existing",routing_key="fubar")
    sleep(1)
于 2013-02-24T07:51:39.573 に答える