2

現在、自分の zeromq アプリケーションをテストする可能性を探っています。パブリッシャー/サブスクライバーを同じスレッドに配置して、メッセージを失うことなくパブリッシャーにパブリッシュさせ、サブスクライバーにサブスクライブさせることができるという印象を受けました。しかし、パブリッシャーにいくつかのメッセージを送信させると、サブスクライバーには何も届きません。

私が使用するコードは次のとおりです。

import zmq

def main():
    ctx = zmq.Context.instance()
    sender = ctx.socket(zmq.PUB)
    sender.setsockopt(zmq.HWM, 1000)
    sender.bind('tcp://*:10001')

    rcvr = ctx.socket(zmq.SUB)
    rcvr.setsockopt(zmq.HWM, 1000)
    rcvr.connect('tcp://127.0.0.1:10001')
    rcvr.setsockopt(zmq.SUBSCRIBE, "")

    for i in range(100):
        sender.send('%i' % i)

    while True:
        try:
            print rcvr.recv(zmq.NOBLOCK)
        except zmq.ZMQError:
            break


if __name__ == '__main__':
    main()

これを実行すると、出力が得られません。

私が驚くのは、送信者が送信する前に受信者が接続されているため、これらのメッセージをキューに入れる必要があるということです。それとも、それは明らかに間違っている仮定であり、代わりに PUSH/PULL を使用する必要がありますか?

4

3 に答える 3

2

これは、 ZeroMQ ガイドに記載されている低速ジョイナーの問題のケースだと思います。

この「遅いジョイナー」の症状は、十分な頻度で十分な数の人々に影響を与えるため、詳細に説明します.

主な問題は、サブスクライバー ソケットがリッスンを開始する前にすべてのメッセージが送信され、メッセージが飛んで破棄されることだと思います。受信者がリッスンを開始する前に最後のメッセージが送信されているため、ソケットのセットアップとメッセージの送信の間に遅延を設定しても機能しません。

あなたが示唆したように、プッシュ/プルソケットはメモリ内でジョブをキューに入れます。このような単一のプロセスでソケット間でジョブを送信できます

# pushpull.py
import zmq

def main():
    ctx = zmq.Context()
    sender = ctx.socket(zmq.PUSH)
    sender.bind('tcp://*:10001')

    rcvr = ctx.socket(zmq.PULL)
    rcvr.connect('tcp://127.0.0.1:10001')

    for i in range(100):
        sender.send_unicode('%i' % i)

    while True:
        msg = rcvr.recv()
        print(msg)

if __name__ == '__main__':
    main()

time.sleep(1)または、pub/sub ソケットを使用する場合は、ソケットのセットアップとメッセージの送信の間に2 つのプロセスと 1 つが必要です。

まず受信機を起動します

# rcvr.py
import zmq

def main():
    ctx = zmq.Context()
    rcvr = ctx.socket(zmq.SUB)
    rcvr.connect('tcp://127.0.0.1:10001')
    rcvr.setsockopt_string(zmq.SUBSCRIBE, "")

    while True:
        msg = rcvr.recv()
        print(msg)

if __name__ == '__main__':
    main()

それから送り主、

# sender.py
import zmq
import time

def main():
    ctx = zmq.Context()
    sender = ctx.socket(zmq.PUB)
    sender.bind('tcp://*:10001')

    time.sleep(1)
    for i in range(100):
        sender.send_unicode('%i' % i)

if __name__ == "__main__":
    main()

受け取るには:

b'0'
b'1'
b'2'
b'3' ...

私は現在、Python 3.3 と pyzmq 13.1.0 で素晴らしいWinPythonディストリビューションを使用しているため、zmq 呼び出しでの文字列処理の一部と print 関数が少し異なります。それが役に立てば幸い。

于 2013-05-22T12:49:05.753 に答える
1

You should be connecting your SUB socket to port 10000 not 10001. Currently the SUB socket is waiting for a publisher and the PUB socket is waiting for a subscriber. 0mq's feature of allowing 'clients' to connect without 'servers' already being present also means there is no error thrown when you connect to port 10001 and that is by design.

于 2013-04-13T23:46:41.773 に答える