そのため、jzmq GIT マスター ブランチと ZeroMQ 3.2.3 を使用して独自のものを作成しています。
インストール後PUB/SUB
、パブリッシャーとサブスクライバーが 1 つのプロセスで対話する次の簡単なプログラムをテストしようとしました。テストは Windows で行うため、TCP を使用しました。
public class ZMQReadynessTest {
private ZMQ.Context context;
@Before
public void setUp() {
context = ZMQ.context(1);
}
@Test
public void testSimpleMessage() {
String topic = "tcp://127.0.0.1:31216";
final AtomicInteger counter = new AtomicInteger();
// _____________________________________ create a simple subscriber
final ZMQ.Socket subscribeSocket = context.socket(ZMQ.SUB);
subscribeSocket.connect(topic);
subscribeSocket.subscribe("TestTopic".getBytes());
Thread subThread = new Thread() {
@Override
public void run() {
while (true) {
String value = null;
// This would result in trouble /\/\/\/\/\/\/\/\/\
{
ByteBuffer buffer = ByteBuffer.allocateDirect(100);
if (subscribeSocket.recvZeroCopy( buffer,
buffer.remaining(),
ZMQ.DONTWAIT
) > 0 ) {
buffer.flip();
value = buffer.asCharBuffer().toString();
System.out.println(buffer.asCharBuffer().toString());
}
}
// This works perfectly + + + + + + + + + + + + +
/*
{
byte[] bytes = subscribeSocket.recv(ZMQ.DONTWAIT);
if (bytes == null || bytes.length == 0) {
continue;
}
value = new String(bytes);
}
*/
if (value != null && value.length() > 0) {
counter.incrementAndGet();
System.out.println(value);
break;
}
}
}
};
subThread.start();
// _____________________________ create a simple publisher
ZMQ.Socket publishSocket = context.socket(ZMQ.PUB);
publishSocket.bind("tcp://*:31216");
try {
Thread.sleep(3000); // + wait 3 sec to make sure its ready
} catch (InterruptedException e) {
e.printStackTrace();
fail();
}
// publish a sample message
try {
publishSocket.send("TestTopic".getBytes(), ZMQ.SNDMORE);
publishSocket.send("This is test string".getBytes(), 0);
subThread.join(100);
} catch (InterruptedException e) {
e.printStackTrace();
fail();
}
assertTrue(counter.get() > 0);
System.out.println(counter.get());
}
}
ご覧のとおり、サブスクライバーで単純な.recv(ZMQ.DONTWAIT)
方法を使用すると、完全に機能します。ただし、ダイレクト バイト バッファを使用している場合、何も返されませんでした。次の例外が発生しました。プログラムの終了時のようです。
Exception in thread "Thread-0" org.zeromq.ZMQException: Resource temporarily unavailable(0xb) at org.zeromq.ZMQ$Socket.recvZeroCopy(Native Method) at ZMQReadynessTest$1.run(ZMQReadynessTest.java:48)
ByteBuffer
また、上記の例外をスローしない単純な (直接バッファーではない) を使用しようとしました。何も返してくれません。
上記を解決する方法を知っている人はいますか?
byte[]
私はいくつかの高性能システムを実行しているので、オブジェクトをあちこちに作成したくありません。これが解決できない場合は、代わりに Unsafe を使用するだけです。でも本当は「あるべき姿」で働きたい。
前もって感謝します。
アレックス