1

私はamqpサーバー(クラスタリング)としてrabbitmqを使用し、amqpクライアントとしてkombu + py-amqpを使用しています。通常のメッセージキューのメッセージを送信/受信できますが、ミラーキューのメッセージを送信/受信する方法がわからず、見つけることができませんGoogle で回答 .how to send/recv mirror queue message?

私のコード:

 with Connection(hostname='192.168.1.10',userid='test',password='test',virtual_host='test') as conn:
        simple_queue = conn.SimpleQueue('test')

そして例外を取得します

  File "./test_amqp.py", line 38, in send
    simple_queue = conn.SimpleQueue('test')
  File "/usr/local/lib/python3.2/site-packages/kombu/connection.py", line 671, in SimpleQueue
    exchange_opts, **kwargs)
  File "/usr/local/lib/python3.2/site-packages/kombu/simple.py", line 122, in __init__
    consumer = messaging.Consumer(channel, queue)
  File "/usr/local/lib/python3.2/site-packages/kombu/messaging.py", line 338, in __init__
    self.revive(self.channel)
  File "/usr/local/lib/python3.2/site-packages/kombu/messaging.py", line 350, in revive
    self.declare()
  File "/usr/local/lib/python3.2/site-packages/kombu/messaging.py", line 360, in declare
    queue.declare()
  File "/usr/local/lib/python3.2/site-packages/kombu/entity.py", line 471, in declare
    self.queue_declare(nowait, passive=False)
  File "/usr/local/lib/python3.2/site-packages/kombu/entity.py", line 497, in queue_declare
    nowait=nowait)
  File "/usr/local/lib/python3.2/site-packages/amqp/channel.py", line 1240, in queue_declare
    (50, 11),  # Channel.queue_declare_ok
  File "/usr/local/lib/python3.2/site-packages/amqp/abstract_channel.py", line 70, in wait
    return self.dispatch_method(method_sig, args, content)
  File "/usr/local/lib/python3.2/site-packages/amqp/abstract_channel.py", line 88, in dispatch_method
    return amqp_method(self, args)
  File "/usr/local/lib/python3.2/site-packages/amqp/channel.py", line 222, in _close
    (class_id, method_id), ChannelError)
amqp.exceptions.PreconditionFailed: Queue.declare: (406) PRECONDITION_FAILED - inequivalent arg 'x-ha-policy'for queue 'smarton' in vhost 'smarton': received none but current is the value 'all' of type 'longstr'
4

2 に答える 2

0

kombu に問題があるのか​​もしれませんが、引数:{"x-ha-policy": "all"} は entity.Queue.queue_arguments に投稿する必要がありますが、entity.Queue の値を設定するメソッドがありません。 .queue_arguments、私は kombu.simple.SimpleQueue を変更し、正しい結果を得ました:

112         if not isinstance(queue, entity.Queue):
113             exchange = entity.Exchange(name, 'direct', **exchange_opts)
114             queue = entity.Queue(name, exchange, name, **queue_opts)
115+            queue.queue_arguments={'x-ha-policy':'all'}
116         else:
117             name = queue.name
118             exchange = queue.exchange
于 2012-12-21T06:32:31.353 に答える
0

キューを宣言するとき、すべてのオプションは、サーバーに既に存在するものと同じでなければなりません。この場合、サーバーの追加オプションは'x-ha-policy': 'all'.

試すsimple_queue = conn.SimpleQueue('test', queue_opts={"x-ha-policy": "all"})

私はこれをテストしていませんが、うまくいくと思います。

于 2012-12-20T06:45:31.097 に答える