zeromq について知っておくべきこと:
- 通常、バインド/接続の順序は重要ではありません
- PUSH/PULL は、1 つのピアが各メッセージを受信する必要がある場合、および/またはメッセージが破棄されるべきでない場合に使用されます。
- PUB/SUB は、すべてのピアがメッセージを受信する必要がある場合、および/または誰もリッスンしていないときに送信されたメッセージをドロップする必要がある場合に使用されます。
- ZeroMQ は、意図的に接続/切断のオープン/クローズ イベントをアプリケーション コードから隠すため、実際のクローズ イベントを検出することはできません。
このために知っておかなければならないことの 1 つ: ソケットが接続すると、パイプが作成されます (ピアがまだ存在している必要はありません)。ソケットがバインドされると、ピアが接続されたときにのみパイプが作成されます。これらのパイプは、ソケットの HWM 動作を制御します。これは、ピアのない接続ソケットとピアのないバインディング ソケットの動作が異なることを意味します。ピアを持たないバインド ソケットは、メッセージを送信しようとするとブロックされますが、接続ソケットは、ピアが到着してメッセージの消費を開始するまで、喜んでメッセージをメモリ内のキューに入れます。
これらの点に基づいて、やりたいことは次のとおりです。
- PUSH/PULLを使う
- 受信者はバインドする必要があります
- tcp/ipc レベルのクローズ イベントを検出するのではなく、キューが完了したことを示す特別な「クローズ」メッセージを送信します。
これは、IPC ソケット (ファイル) を使用して通信する Python での実際の例です。ここでは、受信側が送信側の少し後に開始されます。
双方が知っておく必要がある共通の情報:
import time
import zmq
# the file used for IPC communication
PIPE = '/tmp/fifo-pipe'
# command flags for our tiny message protocol
DONE = b'\x00'
MSG = b'\x01'
レシーバー (PULL) がバインドし、DONE になるまで消費します
def receiver():
ctx = zmq.Context()
s = ctx.socket(zmq.PULL)
s.bind("ipc://%s" % PIPE)
while True:
parts = s.recv_multipart()
cmd = parts[0]
if cmd == DONE:
print "[R] received DONE"
break
msg = parts[1]
# handle the message
print "[R] %.1f consuming %s" % (time.time() - t0, msg)
s.close()
ctx.term()
print "[R] done"
送信側 (PUSH) が接続して送信し、DONE を送信して完了を通知します。
def sender():
ctx = zmq.Context()
s = ctx.socket(zmq.PUSH)
s.connect("ipc://%s" % PIPE)
for i in range(10):
msg = b'msg %i' % i
print "[S] %.1f sending %s" % (time.time() - t0, msg)
s.send_multipart([MSG, msg])
time.sleep(1)
print "[S] sending DONE"
s.send(DONE)
s.close()
ctx.term()
print "[S] done"
そして、それらを一緒に実行するデモ スクリプトです。送信者が最初に開始し、送信者が既に複数のメッセージを送信した後に受信者が開始します。
from threading import Thread
# global t0, just for keeping times relative to start, rather than 1970
t0 = time.time()
# start the sender
s = Thread(target=sender)
s.start()
# start the receiver after a delay
time.sleep(5)
r = Thread(target=receiver)
r.start()
# wait for them both to finish
s.join()
r.join()
ここで一緒に走っているのを見ることができます。