ServiceBus のオンプレミス インストールがあります。再配達の際に挙動がおかしくなりました。サンプル アプリケーションを作成しました。以下を参照してください。このアプリケーションは、5 つのメッセージを永続キューでトピックに投稿し、すべてのメッセージを読み取ろうとします。メッセージごとに、新しいセッションを作成し、メッセージ配信を開始/停止します (qpid バックグラウンド スレッドの適切な開始/停止にはタイムアウトが必要です)。プリフェッチにより、メモの読み取りメッセージがロックされると思います。最終的にはすべてのメッセージを取得できると期待していますが、一部のメッセージが失われています。最大配信数は 10 に設定されています。キューは Service Bus Explorer によって検査されました。テスト後は空であり、不足しているメッセージは Dealetter キューにありません。
動作を明示するために使用されるテスト。これは、メッセージを消費する方法ではありません。
private static final long RECEIVE_TIMEOUT_MS = 5000;
private static final long SLEEP_BETWEEN_SESSIONS_MS = 1000;
private static final long SLEEP_PEEK_TIMEOUT_MS = 70000;
private static final int MAX_SUBSEQUENT_FAILURES = 3;
private static final int MESSAGES_TO_TEST = 5;
// send some messages to empty queue
for (int i = 0; i < MESSAGES_TO_TEST; i++) {
testSendToTopic(connection, context, Integer.toString(i));
Thread.sleep(SLEEP_BETWEEN_SESSIONS_MS);
}
// wait for message
OUTER:
for (int i = 0; i < MESSAGES_TO_TEST; i++) {
Thread.sleep(SLEEP_BETWEEN_SESSIONS_MS);
int subsequentFailures = 0;
while (!testReceiveFromQueue(connection, context)) {
log.info("Waiting for peek lock timeout");
Thread.sleep(SLEEP_PEEK_TIMEOUT_MS);
subsequentFailures++;
if (subsequentFailures > MAX_SUBSEQUENT_FAILURES) {
log.info("Giving up");
break OUTER;
}
}
}
これはログです。メッセージ 1、2、および 4 が失われました。
19:05:51,012 [ main] INFO [Qpid] Message sent, id: 0
19:05:52,039 [ main] INFO [Qpid] Message sent, id: 1
19:05:53,055 [ main] INFO [Qpid] Message sent, id: 2
19:05:54,074 [ main] INFO [Qpid] Message sent, id: 3
19:05:55,088 [ main] INFO [Qpid] Message sent, id: 4
19:05:57,131 [ main] INFO [Qpid] Received message, id: 0, redelivered: false
19:05:57,133 [ main] INFO [Qpid] Message acknowledged
19:06:03,342 [ main] INFO [Qpid] Queue is empty
19:06:03,345 [ main] INFO [Qpid] Waiting for peek lock timeout
19:07:13,358 [ main] INFO [Qpid] Received message, id: 4, redelivered: true
19:07:13,359 [ main] INFO [Qpid] Message acknowledged
19:07:19,367 [ main] INFO [Qpid] Queue is empty
19:07:19,370 [ main] INFO [Qpid] Waiting for peek lock timeout
19:08:34,379 [ main] INFO [Qpid] Queue is empty
19:08:34,381 [ main] INFO [Qpid] Waiting for peek lock timeout
19:09:49,400 [ main] INFO [Qpid] Queue is empty
19:09:49,402 [ main] INFO [Qpid] Waiting for peek lock timeout
19:11:04,417 [ main] INFO [Qpid] Queue is empty
19:11:04,419 [ main] INFO [Qpid] Waiting for peek lock timeout
19:12:14,423 [ main] INFO [Qpid] Giving up
不足しているメソッド (EDITED、簡略化されたコード):
static void testSendToTopic(Connection connection, Context context) throws JMSException,
NamingException {
Session session = null;
MessageProducer messageProducer = null;
try {
session = connection.createSession(false/*transacted*/, Session.AUTO_ACKNOWLEDGE);
Topic topic = (Topic) context.lookup("ORDER_HISTORY_TOPIC");
messageProducer = session.createProducer(topic);
TextMessage message = session.createTextMessage("Hello MS SB");
message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
messageProducer.send(message);
log.info("Message sent");
} finally {
if (null != messageProducer)
messageProducer.close();
if (null != session)
session.close();
}
}
static boolean testReceiveFromQueue(Connection connection, Context context)
throws JMSException,
NamingException {
Session session = null;
MessageConsumer consumer = null;
try {
session = connection.createSession(false/*transacted*/, Session.CLIENT_ACKNOWLEDGE);
Queue queue = (Queue) context.lookup("ORDER_HISTORY_QUEUE");
consumer = session.createConsumer(queue);
// start delivery of incoming messages, otherwise receiveXXX will not get any
connection.start();
// even when there are messages, receiveNoWait may return null
Message message = consumer.receive(RECEIVE_TIMEOUT_MS);
if (null == message) {
log.info("Nothing to receive");
return false;
}
log.info("Received message");
// must be acknowledged before peek lock expires (see Lock Duration)
// Until peek lock timeout the message is will not be delivered to other receivers
// on the same subscription
message.acknowledge();
log.info("Acknowledged");
return true;
} finally {
connection.stop();
if (consumer != null)
consumer.close();
if (null != session)
session.close();
}
}