0

ServiceBus のオンプレミス インストールがあります。再配達の際に挙動がおかしくなりました。サンプル アプリケーションを作成しました。以下を参照してください。このアプリケーションは、5 つのメッセージを永続キューでトピックに投稿し、すべてのメッセージを読み取ろうとします。メッセージごとに、新しいセッションを作成し、メッセージ配信を開始/停止します (qpid バックグラウンド スレッドの適切な開始/停止にはタイムアウトが必要です)。プリフェッチにより、メモの読み取りメッセージがロックされると思います。最終的にはすべてのメッセージを取得できると期待していますが、一部のメッセージが失われています。最大配信数は 10 に設定されています。キューは Service Bus Explorer によって検査されました。テスト後は空であり、不足しているメッセージは Dealetter キューにありません。

動作を明示するために使用されるテスト。これは、メッセージを消費する方法ではありません。

private static final long RECEIVE_TIMEOUT_MS = 5000;
private static final long SLEEP_BETWEEN_SESSIONS_MS = 1000;
private static final long SLEEP_PEEK_TIMEOUT_MS = 70000;
private static final int MAX_SUBSEQUENT_FAILURES = 3;
private static final int MESSAGES_TO_TEST = 5;

        // send some messages to empty queue
        for (int i = 0; i < MESSAGES_TO_TEST; i++) {
            testSendToTopic(connection, context, Integer.toString(i));
            Thread.sleep(SLEEP_BETWEEN_SESSIONS_MS);
        }

        // wait for message
        OUTER:
        for (int i = 0; i < MESSAGES_TO_TEST; i++) {
            Thread.sleep(SLEEP_BETWEEN_SESSIONS_MS);
            int subsequentFailures = 0;

            while (!testReceiveFromQueue(connection, context)) {
                log.info("Waiting for peek lock timeout");

                Thread.sleep(SLEEP_PEEK_TIMEOUT_MS);

                subsequentFailures++;
                if (subsequentFailures > MAX_SUBSEQUENT_FAILURES) {
                    log.info("Giving up");
                    break OUTER;
                }
            }
        }

これはログです。メッセージ 1、2、および 4 が失われました。

    19:05:51,012 [      main] INFO  [Qpid] Message sent, id: 0
    19:05:52,039 [      main] INFO  [Qpid] Message sent, id: 1
    19:05:53,055 [      main] INFO  [Qpid] Message sent, id: 2
    19:05:54,074 [      main] INFO  [Qpid] Message sent, id: 3
    19:05:55,088 [      main] INFO  [Qpid] Message sent, id: 4
    19:05:57,131 [      main] INFO  [Qpid] Received message, id: 0, redelivered: false
    19:05:57,133 [      main] INFO  [Qpid] Message acknowledged
    19:06:03,342 [      main] INFO  [Qpid] Queue is empty
    19:06:03,345 [      main] INFO  [Qpid] Waiting for peek lock timeout
    19:07:13,358 [      main] INFO  [Qpid] Received message, id: 4, redelivered: true
    19:07:13,359 [      main] INFO  [Qpid] Message acknowledged
    19:07:19,367 [      main] INFO  [Qpid] Queue is empty
    19:07:19,370 [      main] INFO  [Qpid] Waiting for peek lock timeout
    19:08:34,379 [      main] INFO  [Qpid] Queue is empty
    19:08:34,381 [      main] INFO  [Qpid] Waiting for peek lock timeout
    19:09:49,400 [      main] INFO  [Qpid] Queue is empty
    19:09:49,402 [      main] INFO  [Qpid] Waiting for peek lock timeout
    19:11:04,417 [      main] INFO  [Qpid] Queue is empty
    19:11:04,419 [      main] INFO  [Qpid] Waiting for peek lock timeout
    19:12:14,423 [      main] INFO  [Qpid] Giving up

不足しているメソッド (EDITED、簡略化されたコード):

static void testSendToTopic(Connection connection, Context context) throws JMSException,
        NamingException {
    Session session = null;
    MessageProducer messageProducer = null;
    try {
        session = connection.createSession(false/*transacted*/, Session.AUTO_ACKNOWLEDGE);

        Topic topic = (Topic) context.lookup("ORDER_HISTORY_TOPIC");
        messageProducer = session.createProducer(topic);
        TextMessage message = session.createTextMessage("Hello MS SB");
        message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
        messageProducer.send(message);
        log.info("Message sent");
    } finally {
        if (null != messageProducer)
            messageProducer.close();
        if (null != session)
            session.close();
    }
}

static boolean testReceiveFromQueue(Connection connection, Context context)
        throws JMSException,
        NamingException {
    Session session = null;
    MessageConsumer consumer = null;
    try {
        session = connection.createSession(false/*transacted*/, Session.CLIENT_ACKNOWLEDGE);

        Queue queue = (Queue) context.lookup("ORDER_HISTORY_QUEUE");
        consumer = session.createConsumer(queue);

        // start delivery of incoming messages, otherwise receiveXXX will not get any
        connection.start();

        // even when there are messages, receiveNoWait may return null
        Message message = consumer.receive(RECEIVE_TIMEOUT_MS);
        if (null == message) {
            log.info("Nothing to receive");

            return false;
        }

        log.info("Received message");

        // must be acknowledged before peek lock expires (see Lock Duration)
        // Until peek lock timeout the message is will not be delivered to other receivers
        // on the same subscription
        message.acknowledge();
        log.info("Acknowledged");

        return true;
    } finally {
        connection.stop();
        if (consumer != null)
            consumer.close();
        if (null != session)
            session.close();
    }
}
4

2 に答える 2