12

そこで、ZMQで ( ) /( )メッセージング用のプロキシ/ブローカーを作成する方法に関するこの記事を読んでいました。アーキテクチャがどのように見えるかについてのこの素晴らしい写真があります:XPUBXSUB

データフロー: ユーザーコード -> PUB -> XSUB -> ユーザーコード -> XPUB -> SUB -> ユーザーコード;  サブスクリプション フロー: ユーザー コード <- PUB <- XSUB <- ユーザー コード <- XPUB <- SUB <- ユーザー コード。

しかし、XSUBソケットの説明Outgoing routing strategyを見ると、それを 介してすべてのサブスクリプションを転送する方法がわかりません。N/A

では、ZeroMQ でサブスクリプション転送 (非) を実装する方法、そのような転送アプリケーションの最小限のユーザー コード (単純なパブリッシャーサブスクライバーのサンプルの間に挿入できるもの) は何ですか?

4

1 に答える 1

18

XPUBはメッセージを受信します。受信するメッセージは、接続されているサブスクライバーからのサブスクリプションのみです。これらのメッセージは、XSUBを介してそのままアップストリームに転送する必要があります。

メッセージを中継する最も簡単な方法は、zmq_proxyを使用することです。

xpub = ctx.socket(zmq.XPUB)
xpub.bind(xpub_url)
xsub = ctx.socket(zmq.XSUB)
xsub.bind(xsub_url)
pub = ctx.socket(zmq.PUB)
pub.bind(pub_url)
zmq.proxy(xpub, xsub, pub)

これは、xpubおよびxsubとの間でメッセージを中継します。オプションで、PUBソケットを追加して、いずれかの方向に通過するトラフィックを監視できます。

途中のユーザーコードで追加のルーティングロジックを実装する場合は、次のような操作を行います。これにより、次の内部ループが再実装されますzmq_proxy

def broker(ctx):
    xpub = ctx.socket(zmq.XPUB)
    xpub.bind(xpub_url)
    xsub = ctx.socket(zmq.XSUB)
    xsub.bind(xsub_url)

    poller = zmq.Poller()
    poller.register(xpub, zmq.POLLIN)
    poller.register(xsub, zmq.POLLIN)
    while True:
        events = dict(poller.poll(1000))
        if xpub in events:
            message = xpub.recv_multipart()
            print "[BROKER] subscription message: %r" % message[0]
            xsub.send_multipart(message)
        if xsub in events:
            message = xsub.recv_multipart()
            # print "publishing message: %r" % message
            xpub.send_multipart(message)

        # insert user code here

完全に機能する(Python)の例

于 2013-01-29T21:21:12.070 に答える