4

ActiveMQ(Javaバージョン)で新しいプロデューサーとコンシューマーの接続(または接続割り込み)をリッスンする方法が見つからないようです。プロデューサーの接続が切断されたことを消費者に(または消費者が自分で知ることができるように)伝えられるようにしたいと思います。逆の方法(特定の消費者が切断されたことをプロデューサーが発見する)も必要です。

助けていただければ幸いです。

4

2 に答える 2

4

特定の宛先(特定のキューまたはトピック)で新しいプロデューサーとコンシューマーをリッスンしたいと思います。そうですか?

ConsumerEventSourceとProducerEventSourceをインスタンス化し、それぞれsetConsumerListenerとsetProducerListenerを呼び出すことで、独自のリスナーを登録できます。

それで:

Connection conn = yourconnection; // the connection your listener will use
Destination dest = yourdestination; // the destination you're paying attention to
ConsumerEventSource source = new ConsumerEventSource(conn, dest);
source.setConsumerListener(new ConsumerListener() {

   public void onConsumerEvent(ConsumerEvent event) {
      if (event.isStarted()) {
         System.out.println("a new consumer has started - " + event.getConsumerId());
      } else {
         System.out.println("a consumer has dropped - " + event.getConsumerId());
      }
   }

});

ConsumerEventSourceまたはProducerEventSourceのコードを見ると、これらはAdvisorySupportのメソッドを使用して、プロデューサーとコンシューマーに関するニュースをブロードキャストすることを目的とした特別なアドバイザリートピックをリッスンする単純なオブジェクトであることがわかります。これらのクラスのソースコードを読むことで、さらに学ぶことができます。

「接続」の使用は潜在的に問題です。ActiveMQランド(JMSランドのサブセット)では、「接続」は特定の宛先に関連付けられていない下位レベルのオブジェクトです。特定のクライアントは、接続から「セッション」を作成し(まだ宛先に固有ではありません)、宛先固有のQueueSender、QueueReceiver、TopicPublisher、またはTopicSubscriberを作成します。それらが作成されたとき、またはそれらを作成したセッションが終了したとき、それらはあなたが聞きたいイベントであり、上記のコードを使用すると聞きます。

于 2009-11-09T21:27:02.613 に答える
2

必要なすべての情報は、「ActiveMQ.Advisory.Connection」や単に「ActiveMQ.Advisory..>」などのActiveMQアドバイザリトピックで公開されています。Stomp Connectionで発生するイベントでさえ、ActiveMQアドバイザリトピックで公開されています。次のコードは、この例を示しています(Stompを介して接続されたFlexクライアントでテスト済み)。

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("user", "password", ActiveMQConnection.DEFAULT_BROKER_URL);
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(transacted, ackMode);
connection.start();
Destination destinationAdvisory = session.createTopic("ActiveMQ.Advisory..>");
MessageConsumer consumerAdvisory = session.createConsumer(destinationAdvisory);
consumerAdvisory.setMessageListener(new MessageListener() {
    public void onMessage(Message message) {

            if (message instanceof ActiveMQMessage) {
                ActiveMQMessage activeMessage = (ActiveMQMessage) message;
                Object command = activeMessage.getDataStructure();
                if (command instanceof ConsumerInfo) {
                    System.out.println("A consumer subscribed to a topic or queue: " + command);
                } else if (command instanceof RemoveInfo) {
                    RemoveInfo removeInfo = (RemoveInfo) command;
                    if (removeInfo.isConsumerRemove()) {
                        System.out.println("A consumer unsubscribed from a topic or queue");
                    }
                    else {
                        System.out.println("RemoveInfo, a connection was closed: " + command);
                    }
                } else if (command instanceof ConnectionInfo) {
                    System.out.println("ConnectionInfo, a new connection was made: " + command);
                } else {
                    System.out.println("Unknown command: " + command);
                }
            }
    }
});
于 2009-11-12T11:46:00.473 に答える