Spring 3 + Active MQ Request/Response Synchronos を実装するサンプル プロジェクトを試しています。Spring 構成ファイル、メッセージをキューに入れるメッセージ プロデューサー、およびメッセージを消費して応答を返すメッセージ コンシューマーを作成しました...
応答が返ってきました...しかし、サンプル プログラムが終了していないようです... Apache Active MQ 管理コンソールを確認すると、テスト クラスを実行するたびに NUMber of Consumers の数が増え続けていることがわかります...管理コンソールでカウントを下げるために、Eclipseで手動で終了する必要がありました...
ここでこのスレッドを参照しました -スタック オーバーフロー スレッド- 同じ問題に直面した人..
だからここに私のコードがあります
<!-- creates an activemq connection factory using the amq namespace -->
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<!--
<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
init-method="start" destroy-method="stop">
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="maxConnections" value="100" />
</bean>
-->
<!-- CachingConnectionFactory Definition, sessionCacheSize property is the
number of sessions to cache -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="jmsConnectionFactory" />
<property name="exceptionListener" ref="jmsExceptionListener" />
<property name="sessionCacheSize" value="1" />
<property name="cacheConsumers" value="false" />
<property name="cacheProducers" value="false" />
</bean>
<!-- JmsTemplate Definition -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory" />
</bean>
<jms:listener-container connection-factory="connectionFactory">
<jms:listener id="request.queue.listener" destination="test.request"
ref="testMessageListener" />
</jms:listener-container>
<bean id="WorkerClient" class="com.vzwcorp.legal.eplm.active.mq.framework.WorkerClient" />
リクエスター クラス
@Component
パブリック クラス リクエスタ {
private static final class ProducerConsumer implements
SessionCallback<Message> {
private static final int TIMEOUT = 5000;
private final String msg;
private final DestinationResolver destinationResolver;
private final String queue;
public ProducerConsumer(final String msg, String queue,
final DestinationResolver destinationResolver) {
this.msg = msg;
this.queue = queue;
this.destinationResolver = destinationResolver;
}
public Message doInJms(final Session session) throws JMSException {
MessageConsumer consumer = null;
MessageProducer producer = null;
try {
final String correlationId = UUID.randomUUID().toString();
final Destination requestQueue = destinationResolver
.resolveDestinationName(session, queue + ".request",
false);
final Destination replyQueue = destinationResolver
.resolveDestinationName(session, queue + ".response",
false);
// Create the consumer first!
consumer = session.createConsumer(replyQueue,
"JMSCorrelationID = '" + correlationId + "'");
final TextMessage textMessage = session.createTextMessage(msg);
textMessage.setJMSCorrelationID(correlationId);
textMessage.setJMSReplyTo(replyQueue);
// Send the request second!
producer = session.createProducer(requestQueue);
producer.send(requestQueue, textMessage);
// Block on receiving the response with a timeout
return consumer.receive(TIMEOUT);
} finally {
// Don't forget to close your resources
JmsUtils.closeMessageConsumer(consumer);
JmsUtils.closeMessageProducer(producer);
}
}
}
private final JmsTemplate jmsTemplate;
@Autowired
public Requestor(final JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public String request(final String request, String queue) {
// Must pass true as the second param to start the connection
TextMessage message = (TextMessage) jmsTemplate.execute(
new ProducerConsumer(request, queue, jmsTemplate
.getDestinationResolver()), true);
try {
return message.getText();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return "exception in requestor";
}
}
}
メッセージ リスナー クラス
@Component
public class TestMessageListener は MessageListener を実装します {
@Autowired
private WorkerClient WorkerClient;
@Override
public void onMessage(Message arg0) {
WorkerClient.delegateToClient(arg0);
}
}
ワーカー クライアント クラス
@Component
public class WorkerClient は ApplicationContextAware を実装します {
private ApplicationContext ctx;
private JmsTemplate jmsTemplate;
public void delegateToClient(Message arg0) {
MessageProducer producer = null;
if (arg0 instanceof TextMessage) {
try {
final TextMessage message = (TextMessage) arg0;
System.out.println("Message received by Listener: "
+ message.getJMSCorrelationID() + " - "
+ message.getText());
jmsTemplate.setDefaultDestination(message.getJMSReplyTo());
Session session = jmsTemplate.getConnectionFactory()
.createConnection()
.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(message.getJMSReplyTo());
final TextMessage textMessage = session
.createTextMessage("I did it at last");
textMessage.setJMSCorrelationID(message.getJMSCorrelationID());
textMessage.setJMSReplyTo(message.getJMSReplyTo());
producer.send(message.getJMSReplyTo(), textMessage);
} catch (Exception e) {
e.printStackTrace();
} finally {
JmsUtils.closeMessageProducer(producer);
}
}
}
@Override
public void setApplicationContext(ApplicationContext arg0)
throws BeansException {
ctx = arg0;
this.jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");
}
ついにテストクラス
public class TestSync {
public static void main(String[] args) {
ApplicationContext ctx = new ClassPathXmlApplicationContext(
"activeMQConfiguration.xml");
OrderService orderService = (OrderService) ctx.getBean("orderService");
JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");
Requestor req = new Requestor(jmsTemplate);
//CopyOfRequestor req = new CopyOfRequestor(jmsTemplate);
String response = req.request("Hello World", "test");
System.out.println(response);
}
}
では、メッセージ コンシューマーが戻ってきてテスト クラスが終了するのを修正するにはどうすればよいでしょうか。助けてください....