2

コンシューマに関連付けられた MessageListener がメッセージの処理を終了したときに通知を受け取ることができるように、ActiveMQ からのアドバイザリまたはその他の代替サポートを実際に探しています。

MessageDelivered アドバイザリは、ブローカーがメッセージを受け取るとすぐに通知するようです。また、MessageConsumed アドバイザリは、コンシューマーがメッセージを受信したときに通知すると主張しています。

- - - - - - - - - - - - アップデート - - - - - - - - - - - - - -----

以下のコード スニペットを見つけてください。

public class SampleListener implements MessageListener {

    private Session session;

    public SampleListener(Session session) {
        this.session = session; 
    }

    public void onMessage(Message message) {
        try {
             // do something
             session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

public class SampleConsumer {

    private boolean stopConnection = false;

    public static void main(String[] args) {
        new SampleConsumer().start();
    }

    public void start() {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            Connection connection;
            try {
                connection = connectionFactory.createConnection();
                connection.start();
                Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
                Destination destination = session.createTopic("test");
                MessageConsumer messageConsumer = session.createConsumer(destination);
                messageConsumer.setMessageListener(new SampleListener(session));

                try {
                    synchronized (this) {
                        while (!stopConnection) {
                            wait();
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    session.close();
                    connection.close();
                }

            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    public void stop() {
        synchronized (this) {
            stopConnection = true;
            notify();
        }
    }
}


public class SampleProducer implements MessageListener {

    private boolean messageDelivered;

    @Test
    public void shouldTestSomething() throws JMSException, InterruptedException {
        producerConnection = new ActiveMQConnectionFactory("tcp://localhost:61616").createConnection();
        producerConnection.start();
        Session session = producerConnection.createSession(true, SESSION_TRANSACTED);

        Destination destination = session.createTopic("test");
        MessageConsumer advisoryConsumer = session.createConsumer(AdvisorySupport.getMessageConsumedAdvisoryTopic(destination));
        advisoryConsumer.setMessageListener(this);

        Message message = session.createTextMessage("Hi");
        Destination destination = session.createTopic("test");
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        producer.send(message);
        session.commit();

        synchronized (this) {
            while (!messageDelivered) {
                wait();
            }
        }

        session.close();

        // some assertions
    }

    public void onMessage(Message message) {
        // do something

        synchronized (this) {
            messageDelivered = true;
            notify();
        }
    }
}
4

1 に答える 1

1

一部のアドバイザリは、デフォルトでは有効になっていません。リンク先:

http://activemq.apache.org/advisory-message.html

無効なアドバイザリは、policyEntry を activemq.xml http://activemq.apache.org/xml-configuration.htmlに追加することで有効にできます。

以下を activemq.xml に追加します。

     <destinationPolicy>
        <policyMap>
          <policyEntries>
            <policyEntry topic=">" advisoryForConsumed="true" />
            <policyEntry  queue=">" advisoryForConsumed="true" />
            ..            
          </policyEntries>
        </policyMap>
    </destinationPolicy>

アドバイザリを有効にしてコンシューマーで session.commit() を呼び出すと、アドバイザリが配信されます。

組み込みブローカーを使用している場合は、activemq.xml をクラスパスに配置し、以下を使用してブローカーを開始できます。

BrokerService broker = BrokerFactory.createBroker("xbean:activemq.xml",true);

(activemq.xml を使用せずに、無効になっているアドバイザリを有効にする方法が見つかりませんでした)。

于 2012-11-05T11:52:17.410 に答える