1

次の点を考慮してください。

  • 3 つの論理サービスのセット: S1S2およびS3
  • 各サービスの 2 つのインスタンスが実行されているため、次のプロセスがS1P1あります。S1P2S2P1S2P2S3P1S3P2
  • ZeroMQ単一のプロセスで実行され、すべてのサービス プロセスから到達可能なブローカー

論理サービス、たとえば は、論理サービスおよびにとって重要なS1メッセージを発行します。各論理サービスの 1 つのプロセスのみが を受信する必要があるため、 ととしましょう。M1S2S3M1S2P1S3P2

私は次のことを試しましたが、成功しませんでした:

  • ブローカー スレッド 1 はXSUB/XPUBプロキシを実行しています
  • ブローカー スレッド 2 は、ソケットに接続され、すべてにサブスクライブされたROUTER/DEALERプロキシを実行しています (logical の場合) 。ROUTERXPUBS1
  • ブローカー スレッド 3 は、ソケットに接続され、すべてにサブスクライブされたROUTER/DEALERプロキシを実行しています (logical の場合) 。ROUTERXPUBS2
  • ブローカ スレッド 4 は、XPUB ソケットに接続され、すべてにサブスクライブされたROUTER/DEALERプロキシを実行しています (論理 の場合) 。ROUTERS3
  • 各論理サービス プロセスはREP、ブローカーDEALERソケットに接続されたソケット スレッドを実行しています。

XSUB/XPUBプロキシはパブリッシュ/サブスクライブのセマンティクスを提供し、プロキシによって送信されるメッセージのソケットROUTER/DEALER間に競合が発生すると考えました。REPXSUB/XPUB

ZeroMQこれを達成するためにソケットを組み合わせるにはどうすればよいですか?

アップデート1

「成功せずに」が役に立たないことはわかっています。さまざまな構成を試してみましたが、さまざまなエラーが発生しました。私が試した最新の構成は次のとおりです。

(XSUB proxy=> XPUB) => (SUB copyLoop=> REQ) => (ROUTER proxy=> DEALER) => REP

copyLoop は次のようになります。

public void start() {
    context = ZMQ.context(1);

    subSocket = context.socket(ZMQ.SUB);
    subSocket.connect(subSocketUrl);
    subSocket.subscribe("".getBytes());

    reqSocket = context.socket(ZMQ.REQ);
    reqSocket.connect(reqSocketUrl);

    while (!Thread.currentThread().isInterrupted()) {
        final Message msg = receiveNextMessage();
        resendMessage(msg);
    }
}

private Message receiveNextMessage() {
    final String header = subSocket.recvStr();
    final String entity = subSocket.recvStr();

    return new Message(header, entity);
}

private void resendMessage(Message msg) {
    reqSocket.sendMore(msg.getKey());
    reqSocket.send(msg.getData(), 0);
}

私が得る例外は次のとおりです。

java.lang.IllegalStateException: Cannot send another request
    at zmq.Req.xsend(Req.java:51) ~[jeromq-0.3.4.jar:na]
    at zmq.SocketBase.send(SocketBase.java:613) ~[jeromq-0.3.4.jar:na]
    at org.zeromq.ZMQ$Socket.send(ZMQ.java:1206) ~[jeromq-0.3.4.jar:na]
    at org.zeromq.ZMQ$Socket.sendMore(ZMQ.java:1189) ~[jeromq-0.3.4.jar:na]
    at com.xyz.messaging.zeromq.SubReqProxyConnector.resendMessage(SubReqProxyConnector.java:47) ~[classes/:na]
    at com.xyz.messaging.zeromq.SubReqProxyConnector.start(SubReqProxyConnector.java:35) ~[classes/:na]

JeroMQ 0.3.4、Oracle Java 8 JVM、および Windows 7 を実行しています。

4

2 に答える 2

1

明らかに、私はいくつかの要素と混同しました:

  • クライアント側ソケット ( Socket.connect) またはサーバー側ソケット ( Socket.bind)として使用しているかどうかに関係なく、ソケットには同じ API があります。
  • ソケットは、タイプに関係なく同じ API を持っています (たとえば、ソケットSocket.subscribeで呼び出すべきではありません)。PUSH
  • 一部のソケット タイプでは、送受信応答ループが必要です (例: REQ/REP) 。
  • コミュニケーションパターンのニュアンス ( PUSH/PULLvs ROUTER/DEALER)
  • ZeroMQ セットアップのデバッグの難しさ (不可能?)

ジェイソンの信じられないほど詳細な回答 (そして素晴らしい図!) に感謝し、正しい方向を示してくれました。

以下のデザインに落ち着きました。

  • ブローカ スレッド 1 は でファンアウトXSUB/XPUBプロキシを実行してbind(localhost:6000)おり、bind(localhost:6001)
  • ブローカ スレッド 2 は と でキューイング プロキシを実行していSUB/PUSHます。ブローカー スレッド 3 と 4 は、バインド ポート番号が異なる同様の設計を使用します。connect(localhost:6001)bind(localhost:6002)
  • メッセージ プロデューサは、PUBソケットを使用してブローカに接続します。connect(localhost:6000)
  • メッセージ コンシューマは、PULL上のソケットを使用してブローカ キューイング プロキシに接続します。connect(localhost:6002)

このサービス固有のキューイング メカニズムに加えて、同様のサービス固有のファンアウト メカニズムを単純に追加することができました。

  • ブローカー スレッドがSUB/PUBプロキシを実行しconnect(localhost:6001)bind(localhost:6003)
  • PUBメッセージ プロデューサは引き続きソケットを使用してブローカに接続します。connect(localhost:6000)
  • SUBメッセージ コンシューマは、上のソケットを使用してブローカ ファンアウト プロキシに接続します。connect(localhost:6003)

これは興味深い乗り物でした。

于 2016-03-14T15:30:44.697 に答える