プロジェクトにアクティブな mq インターフェイスを実装することを学んでいます。これが、プロデューサーとコンシューマーを作成する方法です。
public void connectionSetup(String portName) { // portname is object of PortTO class. We are creating producer and consumer pair for every existing PortTO object.
Connection connection = null;
try {
if (timeToLive != 0) {
}
// Create the connection.
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
connection = connectionFactory.createConnection();
connection.start();
connection.setExceptionListener(this);
// Create the session
Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
if (topic) {
destination = session.createTopic(subject);
} else {
destination = session.createQueue(portName);
}
// Create the producer.
MessageProducer producer = session.createProducer(destination); if (persistent) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
} else {
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
MessageConsumer consumer = session.createConsumer(destination); if (timeToLive != 0)
producer.setTimeToLive(timeToLive);
mapOfSession.put(portName, session);
mapOfMessageProducer.put(portName, producer);
mapOfMessageConsumer.put(portName, consumer); log.info("Producer is " + producer);
log.info("Consumer is " + consumer);
} catch (Exception e) {
log.error(e.getMessage());
}
}
そのため、プロデューサーとコンシューマーを作成し、それらをすべての PortTO オブジェクトのマップに格納しています。現在、プロデューサーはメッセージを送信しています。
TextMessage message = session.createTextMessage();
message.setIntProperty(key, 2);
producer.send(message);
しかし、消費者はそれを消費していません...
public void onMessage(Message message) {
PortService portService = new PortService();
List<PortTO> portTOList = portService.getMoxaPorts();
for(PortTO portTO : portTOList) { // catching messages from producers of every PortTO object
MessageConsumer consumer = DataCollectionMessageProducer.getMapOfMessageConsumer().get(portTO.getPort()); // getting consumer from map of PortTO
consumer.setMessageListener(this);
message = consumer.receive(1000); if (message instanceof TextMessage) {
/ / some processing
}
} else {
if (verbose) {
}
}
}
}
その理由は何ですか?私のアプローチは間違っていますか??