1

Spring 3 + Active MQ Request/Response Synchronos を実装するサンプル プロジェクトを試しています。Spring 構成ファイル、メッセージをキューに入れるメッセージ プロデューサー、およびメッセージを消費して応答を返すメッセージ コンシューマーを作成しました...

応答が返ってきました...しかし、サンプル プログラムが終了していないようです... Apache Active MQ 管理コンソールを確認すると、テスト クラスを実行するたびに NUMber of Consumers の数が増え続けていることがわかります...管理コンソールでカウントを下げるために、Eclipseで手動で終了する必要がありました...

ここでこのスレッドを参照しました -スタック オーバーフロー スレッド- 同じ問題に直面した人..

また、ここここを参照して、ソリューションを作成しました

だからここに私のコードがあります

    <!-- creates an activemq connection factory using the amq namespace -->

<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://localhost:61616" />
</bean>

<!-- 
<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
    init-method="start" destroy-method="stop">
    <property name="connectionFactory" ref="jmsConnectionFactory" />
    <property name="maxConnections" value="100" />
</bean>
 -->

<!-- CachingConnectionFactory Definition, sessionCacheSize property is the 
    number of sessions to cache -->

<bean id="connectionFactory"
    class="org.springframework.jms.connection.CachingConnectionFactory">
    <constructor-arg ref="jmsConnectionFactory" />
    <property name="exceptionListener" ref="jmsExceptionListener" />
    <property name="sessionCacheSize" value="1" />
    <property name="cacheConsumers" value="false" />
    <property name="cacheProducers" value="false" />
</bean>

<!-- JmsTemplate Definition -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <constructor-arg ref="connectionFactory" />

</bean>


<jms:listener-container connection-factory="connectionFactory">
    <jms:listener id="request.queue.listener" destination="test.request"
        ref="testMessageListener" />
</jms:listener-container>

<bean id="WorkerClient" class="com.vzwcorp.legal.eplm.active.mq.framework.WorkerClient" />

リクエスター クラス

@Component

パブリック クラス リクエスタ {

private static final class ProducerConsumer implements
        SessionCallback<Message> {

    private static final int TIMEOUT = 5000;

    private final String msg;

    private final DestinationResolver destinationResolver;

    private final String queue;

    public ProducerConsumer(final String msg, String queue,
            final DestinationResolver destinationResolver) {
        this.msg = msg;
        this.queue = queue;
        this.destinationResolver = destinationResolver;
    }

    public Message doInJms(final Session session) throws JMSException {
        MessageConsumer consumer = null;
        MessageProducer producer = null;
        try {
            final String correlationId = UUID.randomUUID().toString();
            final Destination requestQueue = destinationResolver
                    .resolveDestinationName(session, queue + ".request",
                            false);
            final Destination replyQueue = destinationResolver
                    .resolveDestinationName(session, queue + ".response",
                            false);
            // Create the consumer first!
            consumer = session.createConsumer(replyQueue,
                    "JMSCorrelationID = '" + correlationId + "'");
            final TextMessage textMessage = session.createTextMessage(msg);
            textMessage.setJMSCorrelationID(correlationId);
            textMessage.setJMSReplyTo(replyQueue);
            // Send the request second!
            producer = session.createProducer(requestQueue);
            producer.send(requestQueue, textMessage);
            // Block on receiving the response with a timeout
            return consumer.receive(TIMEOUT);
        } finally {
            // Don't forget to close your resources
            JmsUtils.closeMessageConsumer(consumer);
            JmsUtils.closeMessageProducer(producer);
        }
    }
}

private final JmsTemplate jmsTemplate;

@Autowired
public Requestor(final JmsTemplate jmsTemplate) {
    this.jmsTemplate = jmsTemplate;
}

public String request(final String request, String queue) {
    // Must pass true as the second param to start the connection
    TextMessage message = (TextMessage) jmsTemplate.execute(
            new ProducerConsumer(request, queue, jmsTemplate
                    .getDestinationResolver()), true);
    try {
        return message.getText();
    } catch (JMSException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
        return "exception in requestor";
    }
}

}

メッセージ リスナー クラス

@Component

public class TestMessageListener は MessageListener を実装します {

@Autowired
private WorkerClient WorkerClient;

@Override
public void onMessage(Message arg0) {
    WorkerClient.delegateToClient(arg0);

}

}

ワーカー クライアント クラス

@Component

public class WorkerClient は ApplicationContextAware を実装します {

private ApplicationContext ctx;

private JmsTemplate jmsTemplate;

public void delegateToClient(Message arg0) {
    MessageProducer producer = null;
    if (arg0 instanceof TextMessage) {
        try {
            final TextMessage message = (TextMessage) arg0;

            System.out.println("Message received by Listener: "
                    + message.getJMSCorrelationID() + " - "
                    + message.getText());

            jmsTemplate.setDefaultDestination(message.getJMSReplyTo());

            Session session = jmsTemplate.getConnectionFactory()
                    .createConnection()
                    .createSession(false, Session.AUTO_ACKNOWLEDGE);
            producer = session.createProducer(message.getJMSReplyTo());
            final TextMessage textMessage = session
                    .createTextMessage("I did it at last");
            textMessage.setJMSCorrelationID(message.getJMSCorrelationID());
            textMessage.setJMSReplyTo(message.getJMSReplyTo());
            producer.send(message.getJMSReplyTo(), textMessage);

        } catch (Exception e) {
            e.printStackTrace();

        } finally {
            JmsUtils.closeMessageProducer(producer);
        }
    }
}

@Override
public void setApplicationContext(ApplicationContext arg0)
        throws BeansException {
    ctx = arg0;
    this.jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");
}

ついにテストクラス

public class TestSync {

public static void main(String[] args) {

    ApplicationContext ctx = new ClassPathXmlApplicationContext(
            "activeMQConfiguration.xml");
    OrderService orderService = (OrderService) ctx.getBean("orderService");
    JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");
     Requestor req = new Requestor(jmsTemplate);
    //CopyOfRequestor req = new CopyOfRequestor(jmsTemplate);
    String response = req.request("Hello World", "test");
    System.out.println(response);

}

}

では、メッセージ コンシューマーが戻ってきてテスト クラスが終了するのを修正するにはどうすればよいでしょうか。助けてください....

4

1 に答える 1

0

基本的に問題は、JMS リスナー コンテナによって伝播されるスレッドが原因で、アプリケーション コンテキストが存続することです。修正は、メイン メソッドの最後で ApplicationContext.close() を明示的に呼び出すことです。これにより、すべての Bean (JMS リスナーを含む) が正常にシャットダウンされます。

より良い修正は、Spring Test Support を使用することです。これは、明示的に行う必要があるのではなく、アプリケーション コンテキストの初期化とシャットダウンを処理します。

@RunWith(SpringJUnit4Runner.class)
@ContextConfiguration(...)
public class TestSync{

    @Autowired OrderService orderService;

    @Test
    public void testJMS(){
        ...
    }

}
于 2012-11-15T01:45:39.593 に答える