XPUBはメッセージを受信します。受信するメッセージは、接続されているサブスクライバーからのサブスクリプションのみです。これらのメッセージは、XSUBを介してそのままアップストリームに転送する必要があります。
メッセージを中継する最も簡単な方法は、zmq_proxyを使用することです。
xpub = ctx.socket(zmq.XPUB)
xpub.bind(xpub_url)
xsub = ctx.socket(zmq.XSUB)
xsub.bind(xsub_url)
pub = ctx.socket(zmq.PUB)
pub.bind(pub_url)
zmq.proxy(xpub, xsub, pub)
これは、xpubおよびxsubとの間でメッセージを中継します。オプションで、PUBソケットを追加して、いずれかの方向に通過するトラフィックを監視できます。
途中のユーザーコードで追加のルーティングロジックを実装する場合は、次のような操作を行います。これにより、次の内部ループが再実装されますzmq_proxy
。
def broker(ctx):
xpub = ctx.socket(zmq.XPUB)
xpub.bind(xpub_url)
xsub = ctx.socket(zmq.XSUB)
xsub.bind(xsub_url)
poller = zmq.Poller()
poller.register(xpub, zmq.POLLIN)
poller.register(xsub, zmq.POLLIN)
while True:
events = dict(poller.poll(1000))
if xpub in events:
message = xpub.recv_multipart()
print "[BROKER] subscription message: %r" % message[0]
xsub.send_multipart(message)
if xsub in events:
message = xsub.recv_multipart()
# print "publishing message: %r" % message
xpub.send_multipart(message)
# insert user code here
完全に機能する(Python)の例