3

これにはばかげた単純な答えがあるかもしれませんが、ActiveMQ を使用してプロデューサーとコンシューマーの間でメッセージをやり取りしようとしています。多くのプロデューサーと多くのコンシューマーを用意しますが、各メッセージをコンシューマー間で 1 回だけ配信したいと考えています。これは、トピックを使用できないことを意味しているように思われます。なぜなら、トピックはリッスンしているすべてのコンシューマーにメッセージを配信し、各メッセージを受信するコンシューマーは 1 つだけにしたいからです。

私の問題は、メッセージを受信できることですが、メッセージがキューから取り出されないことです。したがって、コンシューマ プロセスを再起動すると、すべてのメッセージが再処理されます。 この回答は適切なようですが、永続的なキュー コンシューマーを作成できず、永続的なトピック コンシューマーのみを作成できないため (API ドキュメントに何かが欠けていない限り)、適用されないようです。

私のコードは次のとおりです。

TopicConnectionFactory factory = new ActiveMQConnectionFactory(props.getProperty("mq.url"));
Connection conn  = factory.createConnection();
Session session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(props.getProperty("mq.source_queue"));
conn.start();
MessageConsumer consumer = session.createConsumer(queue);

それから後で

Message msg = consumer.receive();
msg.acknowledge();
if (!(msg instanceof TextMessage)) continue;
String msgStr = ((TextMessage)msg).getText();

これが私の現在のコードです。Session.AUTO_ACKNOWLEDGE と msg.acknowledge() なしで試しました。このコードの動作順列はメッセージを取得するように見えますが、コンシューマを再起動すると、再起動前に受信されていたとしても、すべてのメッセージが再び受信されます。

4

1 に答える 1

5

セッションをトランザクション セッションとして作成したため、すべてのメッセージが消費され、再配信する必要がないことをブローカーに通知する場合は、session.commit を呼び出す必要があります。createSession の最初の引数を true に設定しないと、Ack モードが尊重されます。それ以外の場合は無視されます。これは、JMS API の奇妙な点の 1 つです。これを行う場合:

ConnectionFactory factory = new ActiveMQConnectionFactory(props.getProperty("mq.url"));
Connection conn  = factory.createConnection();
Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(props.getProperty("mq.source_queue"));
conn.start();
MessageConsumer consumer = session.createConsumer(queue);

次に、これは機能します:

Message msg = consumer.receive();
msg.acknowledge();

それ以外の場合は、次のことを行う必要があります。

Message msg = consumer.receive();
session.commit(); 

ただし、単一のメッセージの場合、トランザクションはクライアントの ack に対して実際には意味をなさないことに注意してください。トランザクションを使用しない方が良いオプションです。

于 2012-06-30T20:17:24.860 に答える