3

特定のキューの「Number of Consumers」をすべて取り除こうとしています。キューをパージ/削除するたびに、同じ名前でそのキューを再度作成しても、コンシューマーの数は残ります。保留中のメッセージが 0 であっても、まだ 6 つのコンシューマがあります。

セッションまたは接続を閉じていないときに、Java コードで問題が発生した可能性があります。

サーバーの再起動と再インストールの両方を試みました。

これが私のプロデューサーコードです:

 private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static String addElementToQueue(String queueName,String param1, String param2) throws JMSException, NamingException {
  // Getting JMS connection from the server and starting it
        ConnectionFactory connectionFactory =
                new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();

// JMS messages are sent and received using a Session. We will
        // create here a non-transactional session object. If you want
        // to use transactions you should set the first parameter to 'true'
        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);

        // Destination represents here our queue on the
        // JMS server. You don't have to do anything special on the
        // server to create it, it will be created automatically.
        Destination destination = session.createQueue(queueName);

        // MessageProducer is used for sending messages (as opposed
        // to MessageConsumer which is used for receiving them)
        MessageProducer producer = session.createProducer(destination);   

        String queueMessage = param1+ "-" + param2;

        TextMessage message = session.createTextMessage(queueMessage);

        // Here we are sending the message!
        producer.send(message);

        connection.close();
        session.close();      // added after problem came up
        producer.close();     // added after problem came up

        return commandID;
}

これが私の消費者コードです:

 // URL of the JMS server
    private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static Pair consumeNextElement(String queueName) throws JMSException {
        // Getting JMS connection from the server
        ConnectionFactory connectionFactory
                = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();   

        // Creating session for seding messages
        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);

        // Getting the queue
        Destination destination = session.createQueue(queueName);

        // MessageConsumer is used for receiving (consuming) messages
        MessageConsumer consumer = session.createConsumer(destination);


        // Here we receive the message.
        // By default this call is blocking, which means it will wait
        // for a message to arrive on the queue.
        Message message = consumer.receive();

        // There are many types of Message and TextMessage
        // is just one of them. Producer sent us a TextMessage
        // so we must cast to it to get access to its .getText()
        // method.

        String[] parts = ((TextMessage)message).getText().split("-");
        Pair retVal = new Pair(parts[0], parts[1]);

        connection.close();
        session.close();        // added after problem came up
        consumer.close();      // added after problem came up  

        return retVal;
    }

何かご意見は?

ありがとう。

4

1 に答える 1

2

コンシューマーの数は、キュー上のリスナーの数です。キューを削除すると、キューに入れられたメッセージのみが削除されます。リッスンしているコンシューマーは影響を受けません。

接続を維持/再確立するコンシューマーの機能は、接続に使用されるトランスポートに依存する場合があり、トランスポートの設定により、接続プロパティの調整が可能になる場合があります。

率直に言って、これらの経験はあまりありませんが、接続のデバッグに役立つ手段としてアドバイザリメッセージを調査することをお勧めします。JMXインターフェースまたはWebコンソールは、消費者数の報告以外には役に立たないようです。

于 2012-11-07T05:10:10.593 に答える