4

1台のマシン上でローカルに一時的な一方向FIFO通信パイプを介して通信する必要がある2つのプロセス(「送信者」と「受信者」)があります。これが私がしたいことです(Unixドメインソケットに近い言語を使用して):

  • 送信者は既知のアドレスにパイプを「作成」し、すぐにメッセージを送信します
  • ある時点(送信者がパイプを「作成」する前または後)で、受信者はパイプに接続します
  • リーダーがパイプからメッセージを読み取ります
  • 送信者がパイプを「閉じる」
  • 読者は、すべてのメッセージが読み取られたことに気づきます(おそらくパイプが閉じられています)

私の質問は、ZeroMQでこれを実装するにはどうすればよいですか?「PUB/SUB」、「PUSH / PULL」?ZMQソケットの「データの終わり」を検出するメカニズムは何ですか?上記の最初の2つのアイテムの両方の注文を許可することは可能ですか?それは、送信者または受信者が最初に接続を試みるかどうかです。もしそうなら、どのように?

ありがとう。

4

1 に答える 1

6

zeromq について知っておくべきこと:

  1. 通常、バインド/接続の順序は重要ではありません
  2. PUSH/PULL は、1 つのピアが各メッセージを受信する必要がある場合、および/またはメッセージが破棄されるべきでない場合に使用されます。
  3. PUB/SUB は、すべてのピアがメッセージを受信する必要がある場合、および/または誰もリッスンしていないときに送信されたメッセージをドロップする必要がある場合に使用されます。
  4. ZeroMQ は、意図的に接続/切断のオープン/クローズ イベントをアプリケーション コードから隠すため、実際のクローズ イベントを検出することはできません。

このために知っておかなければならないことの 1 つ: ソケットが接続すると、パイプが作成されます (ピアがまだ存在している必要はありません)。ソケットがバインドされると、ピアが接続されたときにのみパイプが作成されます。これらのパイプは、ソケットの HWM 動作を制御します。これは、ピアのない接続ソケットとピアのないバインディング ソケットの動作が異なることを意味します。ピアを持たないバインド ソケットは、メッセージを送信しようとするとブロックされますが、接続ソケットは、ピアが到着してメッセージの消費を開始するまで、喜んでメッセージをメモリ内のキューに入れます。

これらの点に基づいて、やりたいことは次のとおりです。

  1. PUSH/PULLを使う
  2. 受信者はバインドする必要があります
  3. tcp/ipc レベルのクローズ イベントを検出するのではなく、キューが完了したことを示す特別な「クローズ」メッセージを送信します。

これは、IPC ソケット (ファイル) を使用して通信する Python での実際の例です。ここでは、受信側が送信側の少し後に開始されます。

双方が知っておく必要がある共通の情報:

import time

import zmq

# the file used for IPC communication
PIPE = '/tmp/fifo-pipe'

# command flags for our tiny message protocol
DONE = b'\x00'
MSG = b'\x01'

レシーバー (PULL) がバインドし、DONE になるまで消費します

def receiver():
    ctx = zmq.Context()
    s = ctx.socket(zmq.PULL)
    s.bind("ipc://%s" % PIPE)
    while True:
        parts = s.recv_multipart()
        cmd = parts[0]
        if cmd == DONE:
            print "[R] received DONE"
            break
        msg = parts[1]
        # handle the message
        print "[R] %.1f consuming %s" % (time.time() - t0, msg)
    s.close()
    ctx.term()
    print "[R] done"

送信側 (PUSH) が接続して送信し、DONE を送信して完了を通知します。

def sender():
    ctx = zmq.Context()
    s = ctx.socket(zmq.PUSH)
    s.connect("ipc://%s" % PIPE)

    for i in range(10):
        msg = b'msg %i' % i
        print "[S] %.1f sending %s" % (time.time() - t0, msg)
        s.send_multipart([MSG, msg])
        time.sleep(1)
    print "[S] sending DONE"
    s.send(DONE)
    s.close()
    ctx.term()
    print "[S] done"

そして、それらを一緒に実行するデモ スクリプトです。送信者が最初に開始し、送信者が既に複数のメッセージを送信した後に受信者が開始します。

from threading import Thread

# global t0, just for keeping times relative to start, rather than 1970
t0 = time.time()

# start the sender
s = Thread(target=sender)
s.start()

# start the receiver after a delay
time.sleep(5)
r = Thread(target=receiver)
r.start()

# wait for them both to finish
s.join()
r.join()

ここで一緒に走っているのを見ることができます。

于 2013-03-02T02:10:12.827 に答える