シナリオ :
イベント ドリブン メカニズムをZeroMQ
(具体的には)評価していました。jeroMq
アプリケーションは、複数のサービス (パブリッシャーとサブスクライバーの両方がサービス) が同じ jvm または個別のノードに存在できる場所に分散されます。これは、デプロイメント アーキテクチャによって異なります。
観察
遊んでみるために、 jero mq (バージョン:0.3.5) を使用して、 transport としてpub
/sub
パターンを作成しました。inproc:
- スレッドの公開は公開できます (公開されているように見えますが、少なくともエラーはありません)
- 別のスレッドにあるサブスクライバーは何も受信していません。
質問
/inproc:
と一緒に使用できますか?pub
sub
グーグルを試してみましたが、具体的なもの、洞察を見つけることができませんでしたか?
pub
/ sub
withのコードサンプルinproc:
jero mq (バージョン:0.3.5) を使用したインプロセス pub サブの作業コード サンプルは、後でこの投稿にアクセスする人にとって役立ちます。A
1 つのパブリッシャーがトピックおよびを公開しB
、2 人のサブスクライバーがA
および をB
別々に受け取る
/**
* @param args
*/
public static void main(String[] args) {
// The single ZMQ instance
final Context context = ZMQ.context(1);
ExecutorService executorService = Executors.newFixedThreadPool(3);
//Publisher
executorService.execute(new Runnable() {
@Override
public void run() {
startPublishing(context);
}
});
//Subscriber for topic "A"
executorService.execute(new Runnable() {
@Override
public void run() {
startFirstSubscriber(context);
}
});
// Subscriber for topic "B"
executorService.execute(new Runnable() {
@Override
public void run() {
startSecondSubscriber(context);
}
});
}
/**
* Prepare the publisher and publish
*
* @param context
*/
private static void startPublishing(Context context) {
Socket publisher = context.socket(ZMQ.PUB);
publisher.bind("inproc://test");
while (!Thread.currentThread().isInterrupted()) {
// Write two messages, each with an envelope and content
try {
publisher.sendMore("A");
publisher.send("We don't want to see this");
LockSupport.parkNanos(1000);
publisher.sendMore("B");
publisher.send("We would like to see this");
} catch (Throwable e) {
e.printStackTrace();
}
}
publisher.close();
context.term();
}
/**
* Prepare and receive through the subscriber
*
* @param context
*/
private static void startFirstSubscriber(Context context) {
Socket subscriber = context.socket(ZMQ.SUB);
subscriber.connect("inproc://test");
subscriber.subscribe("B".getBytes());
while (!Thread.currentThread().isInterrupted()) {
// Read envelope with address
String address = subscriber.recvStr();
// Read message contents
String contents = subscriber.recvStr();
System.out.println("Subscriber1 " + address + " : " + contents);
}
subscriber.close();
context.term();
}
/**
* Prepare and receive though the subscriber
*
* @param context
*/
private static void startSecondSubscriber(Context context) {
// Prepare our context and subscriber
Socket subscriber = context.socket(ZMQ.SUB);
subscriber.connect("inproc://test");
subscriber.subscribe("A".getBytes());
while (!Thread.currentThread().isInterrupted()) {
// Read envelope with address
String address = subscriber.recvStr();
// Read message contents
String contents = subscriber.recvStr();
System.out.println("Subscriber2 " + address + " : " + contents);
}
subscriber.close();
context.term();
}