1

そのため、jzmq GIT マスター ブランチと ZeroMQ 3.2.3 を使用して独自のものを作成しています。

インストール後PUB/SUB、パブリッシャーとサブスクライバーが 1 つのプロセスで対話する次の簡単なプログラムをテストしようとしました。テストは Windows で行うため、TCP を使用しました。

 public class ZMQReadynessTest {

     private ZMQ.Context context;

     @Before
     public void setUp() {
         context = ZMQ.context(1);
     }

     @Test
     public void testSimpleMessage() {
         String topic = "tcp://127.0.0.1:31216";
         final AtomicInteger counter = new AtomicInteger();

      // _____________________________________ create a simple subscriber

         final ZMQ.Socket subscribeSocket = context.socket(ZMQ.SUB);
         subscribeSocket.connect(topic);
         subscribeSocket.subscribe("TestTopic".getBytes());

         Thread subThread = new Thread() {

             @Override
             public void run() {
                 while (true) {
                     String value = null;

                  // This would result in trouble /\/\/\/\/\/\/\/\/\
                     {
                         ByteBuffer buffer = ByteBuffer.allocateDirect(100);
                         if (subscribeSocket.recvZeroCopy( buffer,
                                                           buffer.remaining(),
                                                           ZMQ.DONTWAIT
                                                           ) > 0 ) {

                             buffer.flip();

                             value = buffer.asCharBuffer().toString();
                             System.out.println(buffer.asCharBuffer().toString());
                         }
                     }

                  // This works perfectly + + + + + + + + + + + + + 
                     /*
                     {
                         byte[] bytes = subscribeSocket.recv(ZMQ.DONTWAIT);
                         if (bytes == null || bytes.length == 0) {
                             continue;
                         }

                         value = new String(bytes);
                     }
                      */

                     if (value != null && value.length() > 0) {
                         counter.incrementAndGet();
                         System.out.println(value);
                         break;
                     }
                 }
             }
         };
         subThread.start();

      // _____________________________ create a simple publisher

         ZMQ.Socket publishSocket = context.socket(ZMQ.PUB);
         publishSocket.bind("tcp://*:31216");

         try {
             Thread.sleep(3000); //  + wait 3 sec to make sure its ready
         } catch (InterruptedException e) {
             e.printStackTrace();
             fail();
         }

      // publish a sample message
         try {
             publishSocket.send("TestTopic".getBytes(), ZMQ.SNDMORE);
             publishSocket.send("This is test string".getBytes(), 0);
             subThread.join(100);
         } catch (InterruptedException e) {
             e.printStackTrace();
             fail();
         }

         assertTrue(counter.get() > 0);
         System.out.println(counter.get());
     }
 }

ご覧のとおり、サブスクライバーで単純な.recv(ZMQ.DONTWAIT)方法を使用すると、完全に機能します。ただし、ダイレクト バイト バッファを使用している場合、何も返されませんでした。次の例外が発生しました。プログラムの終了時のようです。

Exception in thread "Thread-0" org.zeromq.ZMQException: Resource temporarily unavailable(0xb)
at org.zeromq.ZMQ$Socket.recvZeroCopy(Native Method)
at ZMQReadynessTest$1.run(ZMQReadynessTest.java:48)

ByteBufferまた、上記の例外をスローしない単純な (直接バッファーではない) を使用しようとしました。何も返してくれません。

上記を解決する方法を知っている人はいますか?

byte[]私はいくつかの高性能システムを実行しているので、オブジェクトをあちこちに作成したくありません。これが解決できない場合は、代わりに Unsafe を使用するだけです。でも本当は「あるべき姿」で働きたい。

前もって感謝します。

アレックス

4

0 に答える 0