14

たとえば、ゼロmq PUBソケットが接続している場合、すべての送信データをバッファリングすることに気付きました

import zmq
import time
context = zmq.Context()

# create a PUB socket
pub = context.socket (zmq.PUB)
pub.connect("tcp://127.0.0.1:5566")
# push some message before connected
# they should be dropped
for i in range(5):
    pub.send('a message should not be dropped')

time.sleep(1)

# create a SUB socket
sub = context.socket (zmq.SUB)
sub.bind("tcp://127.0.0.1:5566")
sub.setsockopt(zmq.SUBSCRIBE, "")

time.sleep(1)

# this is the only message we should see in SUB
pub.send('hi')

while True:
    print sub.recv()

サブはそれらのメッセージの後にバインドします。PUB は、誰も接続していない場合はメッセージをドロップする必要があるため、ドロップする必要があります。ただし、メッセージをドロップする代わりに、すべてのメッセージをバッファリングします。

a message should not be dropped
a message should not be dropped
a message should not be dropped
a message should not be dropped
a message should not be dropped
hi

ご覧のとおり、これらの「メッセージをドロップしないでください」はソケットによってバッファリングされ、接続されると、SUB ソケットにフラッシュされます。PUB ソケットでバインドし、SUB ソケットで接続すると、正しく動作します。

import zmq
import time
context = zmq.Context()

# create a PUB socket
pub = context.socket (zmq.PUB)
pub.bind("tcp://127.0.0.1:5566")
# push some message before connected
# they should be dropped
for i in range(5):
    pub.send('a message should not be dropped')

time.sleep(1)

# create a SUB socket
sub = context.socket (zmq.SUB)
sub.connect("tcp://127.0.0.1:5566")
sub.setsockopt(zmq.SUBSCRIBE, "")

time.sleep(1)

# this is the only message we should see in SUB
pub.send('hi')

while True:
    print repr(sub.recv())

そして、あなたは出力しか見ることができません

'hi'

この種の奇妙な動作は問題を引き起こします。接続ソケット上のすべてのデータをバッファリングします。サーバーが 2 台あり、サーバー A がデータをサーバー B に発行します。

Server A -- publish --> Server B

サーバー B がオンラインになると、正常に動作します。しかし、サーバー A を起動し、サーバー B を起動しないとどうなりますか?

その結果、サーバー A の接続 PUB ソケットはこれらすべてのデータを保持し、メモリ使用量はますます高くなります。

ここに問題があります。この種の動作はバグですか、それとも機能ですか? 機能の場合、この動作について言及しているドキュメントはどこにありますか? また、接続中の PUB ソケットがすべてのデータをバッファリングするのをどのように停止できますか?

ありがとう。

4

6 に答える 6

7

ソケットがメッセージをブロックするかドロップするかは、ZMQ::Socket のドキュメントで説明されているように、ソケットの種類によって異なります(以下の強調は私のものです)。

ZMQ::HWM: ハイ ウォーター マークを取得する

ZMQ::HWM オプションは、指定されたソケットの最高水準点を取得します。ハイ ウォーター マークは、指定されたソケットが通信している単一のピアに対して 0MQ がメモリ内でキューに入れる未処理メッセージの最大数のハード リミットです。

この制限に達すると、ソケットは例外的な状態になり、ソケットのタイプに応じて、0MQ は送信されたメッセージをブロックまたはドロップするなどの適切なアクションを実行します。各ソケット タイプに対して実行される正確なアクションの詳細については、ZMQ::Socket の個々のソケットの説明を参照してください。

デフォルトの ZMQ::HWM 値ゼロは、「制限なし」を意味します。

またはのZMQ::HWM option actionいずれかになるソケットタイプのドキュメントを調べることで、ブロックまたはドロップするかどうかを確認できます。BlockDrop

のアクションZMQ::PUBDropであるため、ドロップしていない場合は、HWM (ハイ ウォーター マーク) 値を確認し、次の警告に注意する必要がありますシステムがメモリ不足になるまで状態を維持します (その時点でシステムがどのように動作するかはわかりません)。

于 2012-01-22T03:24:01.870 に答える
4

この動作は zmq_connect() のセマンティックだと思います。つまり、zmq_connect() が成功を返すと、概念的に接続が確立され、connecting-PUB がドロップする代わりにメッセージのキューイングを開始します

「 ZMQ Guide 」からの次の抜粋は、これに関するヒントです。

ØMQ ソケットの理論では、どちらの端が接続され、どちらの端がバインドされるかは問題ではありません。ただし、PUB-SUB ソケットでは、SUB ソケットをバインドして PUB ソケットを接続すると、SUB ソケットは古いメッセージ、つまり SUB が起動する前に送信されたメッセージを受信する場合があります。これは、バインド/接続が機能する方法のアーティファクトです。可能であれば、PUB をバインドして SUB を接続することをお勧めします。

zmq_connect ()の次のセクションには、以下に示すいくつかのヒントがあります。

従来のソケットとの主な違い

一般的に言えば、従来のソケットは、接続指向の信頼できるバイト ストリーム (SOCK_STREAM) または接続のない信頼できないデータグラム (SOCK_DGRAM) への同期インターフェイスを提供します。比較すると、ØMQ ソケットは非同期メッセージ キューの抽象化を提示し、正確なキューイング セマンティクスは使用中のソケット タイプに依存します。従来のソケットがバイトのストリームまたは個別のデータグラムを転送するのに対し、ØMQ ソケットは個別のメッセージを転送します。

ØMQ ソケットが非同期であるということは、物理接続のセットアップと破棄、再接続、および効果的な配信のタイミングがユーザーに対して透過的であり、ØMQ 自体によって編成されることを意味します。さらに、ピアがメッセージを受信できない場合、メッセージはキューに入れられることがあります。

于 2012-05-04T09:48:54.210 に答える
1

ソケットに HWM オプションを設定します。

于 2012-01-21T23:28:39.800 に答える
0

したがって、bind()とconnect()は、2つの異なる動作をもたらします。好きなものを選んで(bind()のように見えます)、それを使ってみませんか?

実際、接続が確立されるまで送信メッセージをバッファリングすることは、一般的なZeroMQの機能です。

于 2012-01-21T16:41:22.003 に答える
0

pub ソケットの hwm 設定を使用して、ソケットに最高水準点を設定できるはずです。保持するメッセージの数を定義できます。

于 2012-01-21T20:45:46.427 に答える