これにはばかげた単純な答えがあるかもしれませんが、ActiveMQ を使用してプロデューサーとコンシューマーの間でメッセージをやり取りしようとしています。多くのプロデューサーと多くのコンシューマーを用意しますが、各メッセージをコンシューマー間で 1 回だけ配信したいと考えています。これは、トピックを使用できないことを意味しているように思われます。なぜなら、トピックはリッスンしているすべてのコンシューマーにメッセージを配信し、各メッセージを受信するコンシューマーは 1 つだけにしたいからです。
私の問題は、メッセージを受信できることですが、メッセージがキューから取り出されないことです。したがって、コンシューマ プロセスを再起動すると、すべてのメッセージが再処理されます。 この回答は適切なようですが、永続的なキュー コンシューマーを作成できず、永続的なトピック コンシューマーのみを作成できないため (API ドキュメントに何かが欠けていない限り)、適用されないようです。
私のコードは次のとおりです。
TopicConnectionFactory factory = new ActiveMQConnectionFactory(props.getProperty("mq.url"));
Connection conn = factory.createConnection();
Session session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(props.getProperty("mq.source_queue"));
conn.start();
MessageConsumer consumer = session.createConsumer(queue);
それから後で
Message msg = consumer.receive();
msg.acknowledge();
if (!(msg instanceof TextMessage)) continue;
String msgStr = ((TextMessage)msg).getText();
これが私の現在のコードです。Session.AUTO_ACKNOWLEDGE と msg.acknowledge() なしで試しました。このコードの動作順列はメッセージを取得するように見えますが、コンシューマを再起動すると、再起動前に受信されていたとしても、すべてのメッセージが再び受信されます。