2

システムをmorbidの使用からrabbitmqに移行しようとしていますが、デフォルトで提供されているのと同じブロードキャスト動作のmorbidを取得できないようです。ブロードキャストとは、メッセージがキューに追加されると、すべてのコンシューマーがそれを受信することを意味します。うさぎの場合、メッセージが追加されると、すべてのリスナーにラウンドロビンスタイルで配布されます。

同じ種類のメッセージ配信を実現する方法を教えてもらえますか?

以下で使用されているストンプライブラリはhttp://code.google.com/p/stomppy/です。

stompで対応できない場合でも、amqplibの例でさえ本当に役立ちます。

現在の私のコードは次のようになっています

消費者

import stomp

class MyListener(object):
    def on_error(self, headers, message):
        print 'recieved an error %s' % message

    def on_message(self, headers, message):
        print 'recieved a message %s' % message

conn = stomp.Connection([('0.0.0.0', 61613), ('127.0.0.1', 61613)], 'user', 'password')
conn.set_listener('', MyListener())
conn.start()
conn.connect(username="user", password="password")
headers = {}

conn.subscribe(destination='/topic/demoqueue', ack='auto')

while True:
    pass
conn.disconnect()

そして送信者はこのように見えます

import stomp

class MyListener(object):
    def on_error(self, headers, message):
        print 'recieved an error %s' % message

    def on_message(self, headers, message):
        print 'recieved a message %s' % message

conn = stomp.Connection([('0.0.0.0', 61613), ('127.0.0.1', 61613)], 'user', 'password')
conn.set_listener('', MyListener())
conn.start()
conn.connect(username="user", password="password")
headers = {}

conn.subscribe(destination='/topic/demotopic', ack='auto')

while True:
    pass
conn.disconnect()
4

2 に答える 2

3

「受信グループ」ごとに交換を作成することで、最終的にそれを行う方法を見つけました。うさぎが何千もの交換でどれだけうまくいくかわからないので、本番環境で試す前にこれをよくテストすることをお勧めします

送信コード:

conn.send(str(i), exchange=exchange, destination='')

空白の宛先が必要です。気にするのはその取引所に送信することだけです

受け取る

import stomp
import sys
from amqplib import client_0_8 as amqp
#read in the exchange name so I can set up multiple recievers for different exchanges to tset
exchange = sys.argv[1]
conn = amqp.Connection(host="localhost:5672", userid="username", password="password",
 virtual_host="/", insist=False)

chan = conn.channel()

chan.access_request('/', active=True, write=True, read=True)

#declare my exchange
chan.exchange_declare(exchange, 'topic')
#not passing a queue name means I get a new unique one back
qname,_,_ = chan.queue_declare()
#bind the queue to the exchange
chan.queue_bind(qname, exchange=exchange)

class MyListener(object):
    def on_error(self, headers, message):
        print 'recieved an error %s' % message

    def on_message(self, headers, message):
        print 'recieved a message %s' % message

conn = stomp.Connection([('0.0.0.0', 61613), ('127.0.0.1', 61613)], 'browser', 'browser')
conn.set_listener('', MyListener())
conn.start()
conn.connect(username="username", password="password")
headers = {}

#subscribe to the queue
conn.subscribe(destination=qname, ack='auto')

while True:
    pass
conn.disconnect()
于 2009-06-12T14:32:06.223 に答える
3

どうやら、STOMP を直接使用することはできません。ストンプでブロードキャストを機能させるためにジャンプする必要があるすべてのフープを示すメーリング リスト スレッドがあります (これには、いくつかの下位レベルの AMPQ が含まれます)。

于 2009-06-11T14:40:35.820 に答える