0

最初に私の状況を少し説明します。達成したいのは、複数のプロデューサーと複数のコンシューマー、およびサーバー上の 1 つのブローカーです。メッセージを永続化し、キューを耐久性のあるものにしたいと考えています。互いに干渉しない複数のキューが必要です。キューが 1 つ、プロデューサーが 1 つ、コンシューマーが 1 つで、これは既に正常に機能します。現在のコード:

消費者:

from qpid.connection import Connection
from qpid.datatypes import Message, uuid4
from qpid.util import connect

socket = connect('localhost',5672)
connection = Connection(sock=socket, username='***', password='***')
connection.start()
session = connection.session(str(uuid4()))
local_queue_name = 'my_queue'
queue = session.incoming(local_queue_name)
session.message_subscribe(queue='ProdSixQueue', destination=local_queue_name)
queue.start()
content=''
while content != 'Done':
    message = queue.get(timeout=1000000)
    while(queue.empty() == True):
        time.sleep(1)
    content = message.body
    if(validate(etree.fromstring(content))):
        session.message_accept(RangedSet(message.id)) 
        store(content) #my function to store data..

プロデューサー:

from qpid.connection import Connection
from qpid.datatypes import Message, uuid4
from qpid.util import connect

def produce(theMsg):
        socket = connect('ser.ver.s.ip',5672)
        connection = Connection(sock=socket,username='***',password='***')
        connection.start()
        session = connection.sesssion(str(uuid4()))
        session.queue_declare(queue='ProdSixQueue')
        session.exchange_bind(exchange='amg.direct',queue='ProdSixQueue',binding_key='routing_key')
        properties = session.delivery_properties(routing_key='routing_key')
        session.message_transfer(destination='amq.direct',message=Message(properties,str(theMsg)))
        session.close(timeout=100)

もちろん、これらはプログラム全体ではありませんが、Qpid ブローカーに関係するすべてのコードです。

問題は、新しいコンシューマーとプロデューサーを異なるデバイスに作成し、プロデューサーからコンシューマーに移動したい異なるトラフィックがある場合、ProdSevenQueue などの別のキューに送信しても、最初のコンシューマーがブローカーからすべてのデータを盗むことです。 、および他のコンシューマでは、「prodseven_local_Queue」のような別のローカル キューに格納しようとします。どういうわけかキューを別の方法で使用する必要がありますか、それともここで全体のアイデアを誤解していますか? また、broker top を構成して特定の種類のトラフィックを別の場所にリダイレクトすることについて話している人もいますが、この例は見つかりません。右方向へのプッシュは素晴らしいでしょう。さらに混乱させるために、状況の写真を次に示します。imgur へのリンク

私の弁護のために、私はこれまで AMQP や Apache Qpid で何もしたことがなく、まだ Python も学んでいます。また、もう少し不平を言うと、Apache Qpid ほど一貫性のないドキュメントを見たことがありません。明らかに新しいプレーヤーへの愛はありません。

4

1 に答える 1

1

あなたのプログラムは、Qpid の仕組みを誤解していることを示唆しています。

Qpid (AMQP プロトコル 0-8 から 0-10 の場合) では、メッセージ プロデューサーがExchangeにメッセージを送信します。Exchange は、メッセージを 0 個以上のQueuesにルーティングする役割を果たします。そのルーティングの正確な詳細は、交換の種類によって異なります。このメカニズムを通じて、Qpid は一般的なメッセージング トポロジ (ポイント ツー ポイント、パブリッシュ/サブスクライブ、ファンアウトなど) をサポートします。

ユースケースでは、直接交換のインスタンス (組み込みの amq.direct など) を使用する必要があります。

ダイレクト エクスチェンジは、メッセージのルーティング キーと、キューをエクスチェンジにバインドするために使用されるバインディング キーとの正確な一致に基づいて、メッセージをキューにルーティングします。一般的な規則は、キュー自体の名前をバインディング キーとして使用して、キューをエクスチェンジにバインドすることです。あなたのプログラムは現在、この目的で文字列「routing_key」を使用しているようです。あなたが観察した望ましくない動作について説明できると思います。

詳細については、次を参照してください。

http://qpid.apache.org/releases/qpid-0.24/java-broker/book/Java-Broker-Concepts-Exchanges.html (Qpid Java Broker のドキュメント - ただし、概念は共有されています)

https://access.redhat.com/site/documentation/en-US/Red_Hat_Enterprise_MRG/1.1/html/Messaging_User_Guide/chap-Messaging_User_Guide-Exchanges.html (Redhat の MRG ドキュメントでは、同じ概念が便利な図で説明されています)

Python の例 (Qpid の Web サイトを参照) は参考になります。

于 2013-09-10T22:35:04.900 に答える