1

こんにちは、私はアクティブな mq を使用して非同期メッセージを受信できません。以下のコードは、メッセージのパブリッシュとサブスクライブに使用しています。

    public class publishMessage extends HttpServlet {
    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {

       InitialContext initCtx = new InitialContext();
            Context envContext = (Context) initCtx.lookup("java:comp/env");
            ActiveMQConnectionFactory connectionFactory = (ActiveMQConnectionFactory) envContext.lookup("jms/ConnectionFactory");
            Connection connection = connectionFactory.createConnection();
            connection.start() ;
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("jms/topic/MyQueue");
            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT) ;
            TextMessage msg=session.createTextMessage();
            msg.setText("Message sent");
                    System.out.println("Message Sent");
            producer.send(msg);
            session.commit() ;
            connection.close() ;
            }catch(Exception ex){
                ex.printStackTrace() ;
            }
    }}


class Consumer{

    protected Queue queue;


    protected String queueName = "jms/topic/MyQueue";

    protected String url = ActiveMQConnection.DEFAULT_BROKER_URL;

    protected int ackMode = Session.AUTO_ACKNOWLEDGE;

    public static void main(String[] args) {
        Consumer rec=new Consumer();
        try {
            rec.run();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public void run() throws JMSException{

        System.out.println("URL:" + url);
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        TopicConnection connection = (TopicConnection)connectionFactory.createTopicConnection();
        connection.setClientID("Testingconn1") ;
        connection.start();
        MessageConsumer consumer = null;
        Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
        queue = session.createQueue(queueName) ;
        consumer = session.createConsumer(queue);
        consumer.setMessageListener(new MyListener()) ;
        session.commit() ;
        consumer.close();
        session.close();
        connection.close();
    }

}

package com.java;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class MyListener implements MessageListener{

    @Override
    public void onMessage( final Message message )
    {
        if ( message instanceof TextMessage )
        {
            final TextMessage textMessage = (TextMessage) message;
            try
            {
                System.out.println("Listener:" +  textMessage.getText() );
            }
            catch (final JMSException e)
            {
                e.printStackTrace();
            }
        }

    }
}

上記のコードを実行すると、パブリッシャーはメッセージを正常に送信し、コンシューマーを起動すると必要に応じて出力が表示されますが、コンソールには出力が表示されません。

それを解決したり、非同期メッセージを受信するためのコードを提案したりしてください。

4

2 に答える 2

1

あなたの消費者コードは、実際には消費者にメッセージを消費する時間を与えません。非同期コンシューマーは、コンシューマーがしばらく存続し、アプリが着信メッセージを処理できる場合に使用されます。上記のコードでは、MessageConsumer の timed receive メソッド (consumer.receive(5000) など) またはアプリに適した期間を使用することをお勧めします。ブローカーがコンシューマーを登録してメッセージをルーティングするには時間がかかるため、コンシューマーは作成直後にメッセージを取得できるとは限りません。そのため、アプリでそれを許可する必要があります。

于 2012-09-28T10:28:26.123 に答える
1

あなたの消費者クラスは正しいです。それはスムーズに実行されます。

しかし、消費者は間違っているので、変更する必要があります。

  • まず、接続オブジェクトを作成した後に setClientID("any_string_value") を追加します。

     eg: Connection connection = connectionFactory.createConnection();
        // need to setClientID value, any string value you wish
        connection.setClientID("12345");
    
    • 次に、トピックを介してメッセージを送信するために createConsumer() の代わりに createDurableSubscriber() メソッドを使用します。

       MessageConsumer consumer = session.createDurableSubscriber(topic,"SUB1234");
      

変更されたコンシューマ クラスは次のとおりです。

package mq.test;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class consumer {
    // URL of the JMS server
    private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

    // Name of the topic from which we will receive messages from = " testt"

    public static void main(String[] args) throws JMSException {
        // Getting JMS connection from the server

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();

        // need to setClientID value, any string value you wish
        connection.setClientID("12345");

        try{
        connection.start();
        }catch(Exception e){
            System.err.println("NOT CONNECTED!!!");
        }
        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);

        Topic topic = session.createTopic("test_data");

        //need to use createDurableSubscriber() method instead of createConsumer() for topic
        // MessageConsumer consumer = session.createConsumer(topic);
        MessageConsumer consumer = session.createDurableSubscriber(topic,
                "SUB1234");

        MessageListener listner = new MessageListener() {
            public void onMessage(Message message) {
                try {
                    if (message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println("Received message"
                                + textMessage.getText() + "'");
                    }
                } catch (JMSException e) {
                    System.out.println("Caught:" + e);
                    e.printStackTrace();
                }
            }
        };

        consumer.setMessageListener(listner);
        //connection.close();

    }
}
于 2014-08-29T10:34:57.547 に答える