6

これが私のスクリプトです。


#!/usr/bin/env python

import traceback
import sys
import zmq
from time import sleep

print "Creating the zmq.Context"
context = zmq.Context()

print "Binding the publisher to the local socket at port 5557"
sender = context.socket(zmq.PUB)
sender.bind("tcp://*:5557")

print "Binding the subscriber to the local socket at port 5557"
receiver = context.socket(zmq.SUB)
receiver.connect("tcp://*:5557")

print "Setting the subscriber option to get only those originating from \"B\""
receiver.setsockopt(zmq.SUBSCRIBE, "B")

print "Waiting a second for the socket to be created."
sleep(1)

print "Sending messages"
for i in range(1,10):
    msg = "msg %d" % (i)
    env = None
    if i % 2 == 0:
        env = ["B", msg]
    else:
        env = ["A", msg]
    print "Sending Message:  ", env
    sender.send_multipart(env)

print "Closing the sender."
sender.close()

failed_attempts = 0
while failed_attempts < 3:
    try:
        print str(receiver.recv_multipart(zmq.NOBLOCK))
    except:
        print traceback.format_exception(*sys.exc_info())
        failed_attempts += 1 

print "Closing the receiver."
receiver.close()

print "Terminating the context."
context.term()

"""
Output:

Creating the zmq.Context
Binding the publisher to the local socket at port 5557
Binding the subscriber to the local socket at port 5557
Setting the subscriber option to get only those originating from "B"
Waiting a second for the socket to be created.
Sending messages
Sending Message:   ['A', 'msg 1']
Sending Message:   ['B', 'msg 2']
Sending Message:   ['A', 'msg 3']
Sending Message:   ['B', 'msg 4']
Sending Message:   ['A', 'msg 5']
Sending Message:   ['B', 'msg 6']
Sending Message:   ['A', 'msg 7']
Sending Message:   ['B', 'msg 8']
Sending Message:   ['A', 'msg 9']
Closing the sender.
['B', 'msg 2']
['B', 'msg 4']
['B', 'msg 6']
['B', 'msg 8']
['Traceback (most recent call last):\n', '  File "./test.py", line 43, in \n    print str(receiver.recv_multipart(zmq.NOBLOCK))\n', '  File "socket.pyx", line 611, in zmq.core.socket.Socket.recv_multipart (zmq/core/socket.c:5181)\n', '  File "socket.pyx", line 514, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4811)\n', '  File "socket.pyx", line 548, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4673)\n', '  File "socket.pyx", line 99, in zmq.core.socket._recv_copy (zmq/core/socket.c:1344)\n', 'ZMQError: Resource temporarily unavailable\n']
['Traceback (most recent call last):\n', '  File "./test.py", line 43, in \n    print str(receiver.recv_multipart(zmq.NOBLOCK))\n', '  File "socket.pyx", line 611, in zmq.core.socket.Socket.recv_multipart (zmq/core/socket.c:5181)\n', '  File "socket.pyx", line 514, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4811)\n', '  File "socket.pyx", line 548, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4673)\n', '  File "socket.pyx", line 99, in zmq.core.socket._recv_copy (zmq/core/socket.c:1344)\n', 'ZMQError: Resource temporarily unavailable\n']
['Traceback (most recent call last):\n', '  File "./test.py", line 43, in \n    print str(receiver.recv_multipart(zmq.NOBLOCK))\n', '  File "socket.pyx", line 611, in zmq.core.socket.Socket.recv_multipart (zmq/core/socket.c:5181)\n', '  File "socket.pyx", line 514, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4811)\n', '  File "socket.pyx", line 548, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4673)\n', '  File "socket.pyx", line 99, in zmq.core.socket._recv_copy (zmq/core/socket.c:1344)\n', 'ZMQError: Resource temporarily unavailable\n']
Closing the receiver.
Terminating the context.
"""

そして、問題は...なぜこのコードが機能しないのですか?

[編集]zeromqメーリングリストで非常に迅速な応答を得た後、上記のコードを更新しました。

4

2 に答える 2

10

クレジット:Chuck Remes

ソケットの作成手順(バインド、接続、setsockopt)と実際のメッセージの送信の間に「スリープ」が必要になる場合があります。バインドと接続の操作は非同期であるため、すべてのメッセージを送信するロジックに到達するまでに完了しない場合があります。その場合、zmq_bind()操作は別のソケットが正常に接続されるまでキューを作成しないため、 PUBソケットを介して送信されたメッセージはすべてドロップされます。

ちなみに、この例では2つのコンテキストを作成する必要はありません。両方のソケットを同じコンテキスト内で作成できます。痛くはありませんが、必要ではありません。

クレジット:Pieter

これを説明する「問題解決者」がCh1の終わりにあります。

一部のソケットタイプ(ROUTERおよびPUB)は、受信者がいないメッセージをサイレントにドロップします。チャックが言ったように、接続は非同期であり、約100ミリ秒かかります。2つのスレッドを開始し、一方をバインドし、もう一方を接続し、すぐにそのようなソケットタイプを介してデータを送信し始めると、最初の100ミリ秒のデータが失われます(約)。

睡眠をとることは、残忍な「それが機能することを証明する」オプションです。現実的には、何らかの方法で同期するか、(より一般的には)通常の起動の一部としてメッセージの損失を予期します(つまり、公開されたデータを明確な開始または終了のない純粋なブロードキャストとして表示します)。

詳細については、天気予報の更新例、syncpubおよびsyncsubを参照してください。

于 2011-06-05T20:15:18.963 に答える
2

ネクロ投稿ですが、睡眠以外の解決策に興味がある人のために、モニターがあります。

モニターのコールバックを設定して、ZMQ_EVENT_CONNECTEDイベントで呼び出されるようにすることができます。

詳細と例については、http://api.zeromq.org/3-3:zmq-ctx-set-monitorをご覧ください。

于 2019-08-09T17:32:29.083 に答える