5

シナリオ :

イベント ドリブン メカニズムをZeroMQ(具体的には)評価していました。jeroMq

アプリケーションは、複数のサービス (パブリッシャーとサブスクライバーの両方がサービス) が同じ jvm または個別のノードに存在できる場所に分散されます。これは、デプロイメント アーキテクチャによって異なります。

観察

遊んでみるために、 jero mq (バージョン:0.3.5) を使用して、 transport としてpub/subパターンを作成しました。inproc:

  1. スレッドの公開は公開できます (公開されているように見えますが、少なくともエラーはありません)
  2. 別のスレッドにあるサブスクライバーは何も受信していません。

質問

/inproc:と一緒に使用できますか?pubsub

グーグルを試してみましたが、具体的なもの、洞察を見つけることができませんでしたか?

pub/ subwithのコードサンプルinproc:

jero mq (バージョン:0.3.5) を使用したインプロセス pub サブの作業コード サンプルは、後でこの投稿にアクセスする人にとって役立ちます。A1 つのパブリッシャーがトピックおよびを公開しB、2 人のサブスクライバーがAおよび をB別々に受け取る

/**
 * @param args
 */
public static void main(String[] args) {

    // The single ZMQ instance
    final Context context = ZMQ.context(1);

    ExecutorService executorService = Executors.newFixedThreadPool(3);
    //Publisher
    executorService.execute(new Runnable() {

        @Override
        public void run() {
            startPublishing(context);
        }
    });
    //Subscriber for topic "A"
    executorService.execute(new Runnable() {

        @Override
        public void run() {
            startFirstSubscriber(context);
        }
    });
    // Subscriber for topic "B"
    executorService.execute(new Runnable() {

        @Override
        public void run() {
            startSecondSubscriber(context);
        }
    });

}

/**
 * Prepare the publisher and publish
 * 
 * @param context
 */
private static void startPublishing(Context context) {

    Socket publisher = context.socket(ZMQ.PUB);
    publisher.bind("inproc://test");
    while (!Thread.currentThread().isInterrupted()) {
        // Write two messages, each with an envelope and content
        try {
            publisher.sendMore("A");
            publisher.send("We don't want to see this");
            LockSupport.parkNanos(1000);
            publisher.sendMore("B");
            publisher.send("We would like to see this");
        } catch (Throwable e) {

            e.printStackTrace();
        }
    }
    publisher.close();
    context.term();
}

/**
 * Prepare and receive through the subscriber
 * 
 * @param context
 */
private static void startFirstSubscriber(Context context) {

    Socket subscriber = context.socket(ZMQ.SUB);

    subscriber.connect("inproc://test");

    subscriber.subscribe("B".getBytes());
    while (!Thread.currentThread().isInterrupted()) {
        // Read envelope with address
        String address = subscriber.recvStr();
        // Read message contents
        String contents = subscriber.recvStr();
        System.out.println("Subscriber1 " + address + " : " + contents);
    }
    subscriber.close();
    context.term();

}

/**
 * Prepare and receive though the subscriber
 * 
 * @param context
 */
private static void startSecondSubscriber(Context context) {
    // Prepare our context and subscriber

    Socket subscriber = context.socket(ZMQ.SUB);

    subscriber.connect("inproc://test");
    subscriber.subscribe("A".getBytes());
    while (!Thread.currentThread().isInterrupted()) {
        // Read envelope with address
        String address = subscriber.recvStr();
        // Read message contents
        String contents = subscriber.recvStr();
        System.out.println("Subscriber2 " + address + " : " + contents);
    }
    subscriber.close();
    context.term();

}
4

1 に答える 1

5

ZMQinprocトランスポートは、異なるスレッド間で単一のプロセス内で使用することを目的としています。「同じjvmまたは個別のノードに存在できます」(私の強調)と言うとき、単一のプロセス内の複数のスレッドではなく、複数のプロセスを分散サービスとしてスピンアップしていることを意味すると思います。

その場合、いいえ、あなたがやろうとしていることはinproc. PUB-SUB/inproc複数のスレッド間の単一のプロセス内で正常に機能します。


コメントでさらに質問に対処するために編集します。

inprocorのようなトランスポートを使用する理由ipcは、それらを使用する適切なコンテキストにいる場合、tcp トランスポートよりも少し効率的 (高速) であるためです。トランスポートを組み合わせて使用​​することも考えられますが、それを機能させるには、常に同じトランスポートにバインドして接続する必要があります。

これは、各ノードが最大 3PUBSUBのソケットを必要とすることを意味します。つまりtcp、リモート ホスト上ipcのノードと通信するパブリッシャー、同じホスト上の異なるプロセスのノードと通信するパブリッシャー、同じホストのinproc異なるスレッドのノードと通信するパブリッシャーです。処理する。

実際には、ほとんどの場合、tcpトランスポートを使用するだけで、すべてに対して 1 つのソケットのみをスピンアップするだけで、tcpどこでも機能します。各ソケットが特定の種類の情報を担当する場合、複数のソケットを起動すること理にかなっています。

常にあるメッセージ タイプを他のスレッドに送信し、別のメッセージ タイプを他のホストに送信する理由がある場合、複数のソケットは理にかなっていますが、あなたの場合、あるノードの観点からは、他のすべてのノードのように聞こえます。は同じ。その場合、私はtcpどこでも使用して、それで終わります。

于 2016-02-17T15:21:41.303 に答える