次の点を考慮してください。
- 3 つの論理サービスのセット:
S1
、S2
およびS3
- 各サービスの 2 つのインスタンスが実行されているため、次のプロセスが
S1P1
あります。S1P2
S2P1
S2P2
S3P1
S3P2
ZeroMQ
単一のプロセスで実行され、すべてのサービス プロセスから到達可能なブローカー
論理サービス、たとえば は、論理サービスおよびにとって重要なS1
メッセージを発行します。各論理サービスの 1 つのプロセスのみが を受信する必要があるため、 ととしましょう。M1
S2
S3
M1
S2P1
S3P2
私は次のことを試しましたが、成功しませんでした:
- ブローカー スレッド 1 は
XSUB/XPUB
プロキシを実行しています - ブローカー スレッド 2 は、ソケットに接続され、すべてにサブスクライブされた
ROUTER/DEALER
プロキシを実行しています (logical の場合) 。ROUTER
XPUB
S1
- ブローカー スレッド 3 は、ソケットに接続され、すべてにサブスクライブされた
ROUTER/DEALER
プロキシを実行しています (logical の場合) 。ROUTER
XPUB
S2
- ブローカ スレッド 4 は、XPUB ソケットに接続され、すべてにサブスクライブされた
ROUTER/DEALER
プロキシを実行しています (論理 の場合) 。ROUTER
S3
- 各論理サービス プロセスは
REP
、ブローカーDEALER
ソケットに接続されたソケット スレッドを実行しています。
XSUB/XPUB
プロキシはパブリッシュ/サブスクライブのセマンティクスを提供し、プロキシによって送信されるメッセージのソケットROUTER/DEALER
間に競合が発生すると考えました。REP
XSUB/XPUB
ZeroMQ
これを達成するためにソケットを組み合わせるにはどうすればよいですか?
アップデート1
「成功せずに」が役に立たないことはわかっています。さまざまな構成を試してみましたが、さまざまなエラーが発生しました。私が試した最新の構成は次のとおりです。
(XSUB proxy=> XPUB) => (SUB copyLoop=> REQ) => (ROUTER proxy=> DEALER) => REP
copyLoop は次のようになります。
public void start() {
context = ZMQ.context(1);
subSocket = context.socket(ZMQ.SUB);
subSocket.connect(subSocketUrl);
subSocket.subscribe("".getBytes());
reqSocket = context.socket(ZMQ.REQ);
reqSocket.connect(reqSocketUrl);
while (!Thread.currentThread().isInterrupted()) {
final Message msg = receiveNextMessage();
resendMessage(msg);
}
}
private Message receiveNextMessage() {
final String header = subSocket.recvStr();
final String entity = subSocket.recvStr();
return new Message(header, entity);
}
private void resendMessage(Message msg) {
reqSocket.sendMore(msg.getKey());
reqSocket.send(msg.getData(), 0);
}
私が得る例外は次のとおりです。
java.lang.IllegalStateException: Cannot send another request
at zmq.Req.xsend(Req.java:51) ~[jeromq-0.3.4.jar:na]
at zmq.SocketBase.send(SocketBase.java:613) ~[jeromq-0.3.4.jar:na]
at org.zeromq.ZMQ$Socket.send(ZMQ.java:1206) ~[jeromq-0.3.4.jar:na]
at org.zeromq.ZMQ$Socket.sendMore(ZMQ.java:1189) ~[jeromq-0.3.4.jar:na]
at com.xyz.messaging.zeromq.SubReqProxyConnector.resendMessage(SubReqProxyConnector.java:47) ~[classes/:na]
at com.xyz.messaging.zeromq.SubReqProxyConnector.start(SubReqProxyConnector.java:35) ~[classes/:na]
JeroMQ 0.3.4、Oracle Java 8 JVM、および Windows 7 を実行しています。