4

私はZeroMQを初めて使用します。私はガイドを読み、現在例を調べているだけでなく、ウェブ上の他の関連情報を見ています。どのメッセージ パターンを使用するか、または 2 つのパターンを組み合わせて使用​​する必要があるかどうかについて、私は判断がつきません。

交換が必要な自家製のメッセージング システムを備えた既存のソフトウェア アプリケーションがあります。私はかなり単純なアーキテクチャを持っています:

|Client|<----->|driver1|
           |
           |---|driverN|

現在、一度に 1 つの「クライアント」のみがドライバーに接続します。多くのドライバーが存在する可能性があります。

(実際には、この場合のクライアントは真のクライアント アプリケーションではなく、一種の仲介者です。この議論では、クライアントとして扱うことができます)

メッセージ:

  1. クライアントはドライバーにコマンドを発行します。
  2. ドライバーは、コマンドに応答してステータス/状態情報を返します。
  3. ドライバーはデータ要素を生成します (つまり、ステータス/状態情報ではありません)。
  4. 一部のクライアント メッセージは、接続されているすべてのデバイスに送信され、一部は単一のドライバーにのみ送信されます。

ドライバーは、同じシステム上に存在する場合もあれば、LAN 上にリモートで存在する場合もあります。これはパブリック ネットワークではありません。

現在、各ドライバーに pub および sub ソケットを、クライアントに sub/pub ソケットを配置することを考えています。接続が確立されたら、メッセージをドロップしないでください。クライアントがさまざまなドライバーのデータ型をサブスクライブし、ドライバーがクライアントのコマンド メッセージをサブスクライブすると仮定します。

重要な考慮事項: 低レイテンシー、可能な限り低い帯域幅オーバーヘッド。

提案や推奨事項をいただければ幸いです。前もって感謝します!

4

1 に答える 1

7

あなたは素晴らしい学習演習を選びました。それは確かです!

これらを読んでください。これらは、ポーリングを備えたカスタムのルーター間プロキシを使用した要求/応答の基本的な実装を提供し、クライアントからデバイスへの問題に対処する必要があります。

このソリューションは同期的であるため、クライアントから送信された要求は、応答を受け取るまでブロックされます。個人的には、完全な柔軟性のためにリクエストとリプライの両方に async を使用しますが、そのソリューションはより複雑です。ただし、呼ばれる本には、非同期の要求/応答を示す例がありFreelanceますDealer/Router

同期多対多の要求/応答の例を次に示します。このアプローチの仕組みを完全に理解するには、ZeroMq エンベロープがどのように機能するかを知っておく必要があります。例lbbroker1を参照してください。

クライアント

setIdentity();でクライアント ID を設定します。応答ルーティングにとって重要です。
クライアントは 、 などのリクエストをループで送信device1device2ます。デバイスが存在する場合は、特定のデバイスからステータス メッセージが返されます。それ以外の場合は、「デバイスなし」がクライアントに返されます。

    Socket client = context.socket(ZMQ.REQ);
    client.setIdentity("client1".getBytes());
    client.connect("tcp://localhost:5550");

    for( int i = 0; i < 5; i++){
       client.send("device" + i);   
       String reply = client.recvStr();
       log("Received message: " + reply);
       Thread.currentThread().sleep(1000);
}

デバイス

デバイス セット ID は、一意のルーティング用のクライアントと同様です。
デバイスdevice.send("DEVICEREADY")がサーバーに送信して、オンラインで利用できることを知らせます。
デバイスはrecvStr()、サーバーから完全なエンベロープを読み取るために 3 回実行します。

String deviceId = "device1"
Socket device = context.socket(ZMQ.REQ);
device.setIdentity(deviceId.getBytes());

device.connect( "tcp://localhost:5560");
device.send( "DEVICEREADY");

while (!Thread.currentThread().isInterrupted()) {
   String clientAddress = device.recvStr();             
   String empty = device.recvStr();
   String clientRequest = device.recvStr();

   //create envelope to send reply to same client who made request          
   device.sendMore(clientAddress);
   device.sendMore("");
   device.send( "stauts on " + deviceId + " is ok");
}

サーバー (ルーター/ルーター)

ROUTER ソケットを使用するカスタム プロキシ。クライアントはフロントエンド ROUTER ソケットに接続し、デバイスはバックエンド ルーターに接続します。サーバーは両方のソケットでメッセージをポーリングします。

Context context = ZMQ.context(1);       
Socket frontend = context.socket(ZMQ.ROUTER);
Socket backend = context.socket(ZMQ.ROUTER);
frontend.bind( "tcp://localhost:5550");
backend.bind(  "tcp://localhost:5560");         

Poller poller = new Poller(2);  
poller.register(frontend, Poller.POLLIN);
poller.register(backend, Poller.POLLIN);

while (!Thread.currentThread().isInterrupted()) {
   poller.poll();

   //frontend poller
   if (poller.pollin(0)) {
      String clientId = frontend.recvStr();
      String empty = frontend.recvStr(); //empty frame
      String deviceId = frontend.recvStr();

      //if client is requesting to talk to nonexistent deviceId,
      //return message "no device", otherwise, create envelope and send
      //request on backend router to device.
      if( deviceMap.get( deviceId) == null ){
         frontend.sendMore(clientId);
         frontend.sendMore("");
         frontend.send("no deviceId: " + deviceId);
      } else {
        //request envelope addressed to specific device
        backend.sendMore(deviceId);
        backend.sendMore("");
        backend.sendMore(clientId);
        backend.sendMore("");
        backend.send("hello from " + clientId);
       }
   }

    //backend poller  
    if(poller.pollin(1)){
       String deviceId = backend.recvStr();
       String empty = backend.recvStr();
       String clientId = backend.recvStr();

       //device signaling it's ready
       //store deviceId in map, don't send a response
       if( clientId.equals("DEVICEREADY"))
           deviceMap.put(deviceId, deviceId);

       else {
           //the device is sending a response to a client
           //create envelope addressed to client, send on frontend socket
           empty = backend.recvStr();
           String reply = backend.recvStr();
           frontend.sendMore(clientId);
           frontend.sendMore("");
           frontend.send(reply);
        }
    }
}
于 2013-08-02T05:53:24.713 に答える