0

2 つの異なるリモートの activeMQ ブローカー間で jms メッセージを移動しようとしています。

私はスタンドアロン アプリケーションを作成しているため、Atomikos を使用しています。また、Spring を使用してすべてを機能させています。

次のBean javaconfigセットアップがあります

@Bean(name="atomikosSrcConnectionFactory")
    public AtomikosConnectionFactoryBean consumerXAConnectionFactory() {
        AtomikosConnectionFactoryBean consumerBean = new AtomikosConnectionFactoryBean();
        consumerBean.setUniqueResourceName("atomikosSrcConnectionFactory");
        consumerBean.setLocalTransactionMode(false);
        return consumerBean;
    }

    @Bean(name="atomikosDstConnectionFactory")
    public AtomikosConnectionFactoryBean producerXAConnectionFactory() {
        AtomikosConnectionFactoryBean producerBean = new AtomikosConnectionFactoryBean();
        producerBean.setUniqueResourceName("atomikosDstConnectionFactory");
        producerBean.setLocalTransactionMode(false);
        return producerBean;
    }

    @Bean(name="jtaTransactionManager")
    public JtaTransactionManager jtaTransactionManager() throws SystemException {
        JtaTransactionManager jtaTM = new JtaTransactionManager();
        jtaTM.setTransactionManager(userTransactionManager());
        jtaTM.setUserTransaction(userTransactionImp());
        return jtaTM;
    }

    @Bean(initMethod="init", destroyMethod="close", name="userTransactionManager")
    public UserTransactionManager userTransactionManager() {
        UserTransactionManager utm = new UserTransactionManager();
        utm.setForceShutdown(false);
        return utm;
    }

    @Bean(name="userTransactionImp")
    public UserTransactionImp userTransactionImp() throws SystemException {
        UserTransactionImp uti = new UserTransactionImp();
        uti.setTransactionTimeout(300);
        return uti;
    }

    @Bean(name="jmsContainer")
    @Lazy(value=true)
    public DefaultMessageListenerContainer jmsContainer() throws SystemException {
        DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
        dmlc.setAutoStartup(false);
        dmlc.setTransactionManager(jtaTransactionManager());
        dmlc.setSessionTransacted(true);
        dmlc.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        dmlc.setConnectionFactory(consumerXAConnectionFactory());
        dmlc.setDestinationName("srcQueue");
        return dmlc;
    }

    @Bean(name="transactedJmsTemplate")
    public JmsTemplate transactedJmsTemplate() {

        DynamicDestinationResolver dest = new DynamicDestinationResolver();

        JmsTemplate jmsTmp = new JmsTemplate(producerXAConnectionFactory());

        jmsTmp.setDeliveryPersistent(true);
        jmsTmp.setSessionTransacted(true);
        jmsTmp.setDestinationResolver(dest);
        jmsTmp.setPubSubDomain(false);
        jmsTmp.setReceiveTimeout(20000);
        jmsTmp.setExplicitQosEnabled(true);
        jmsTmp.setSessionTransacted(true);
        jmsTmp.setDefaultDestination(new ActiveMQQueue("destQueue"));
        jmsTmp.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);

        return jmsTmp;
    }

DMLC を開始する前に、実行時に 2 つの AtomikosConnectionFactoryBean が ActiveMQXAConnectionFactory (ブローカーごとに 1 つ) をラップしています。

次に、次のメソッドを使用して、簡単な messageListener (開始前に dmlc に割り当てられます) をセットアップします。

public void onMessage(Message message) {
    final Message rcvedMsg = message;

    try{
        MessageCreator msgCreator = new MessageCreator(){
                public Message createMessage(Session session) throws JMSException{
                    Message returnMsg = null;
                    if(rcvedMsg instanceof TextMessage){
                        TextMessage txtMsg = session.createTextMessage();
                        txtMsg.setText(((TextMessage) rcvedMsg).getText());
                        returnMsg = txtMsg;
                    }
                    else if(rcvedMsg instanceof BytesMessage){
                        BytesMessage bytesMsg = session.createBytesMessage();
                        if(!(((BytesMessage) rcvedMsg).getBodyLength() > Integer.MAX_VALUE)){
                            byte[] bodyContent = new byte[(int) ((BytesMessage) rcvedMsg).getBodyLength()];
                            bytesMsg.writeBytes(bodyContent);
                            returnMsg = bytesMsg;
                        }
                    }
                    return returnMsg;
                }
            };

            jmsTemplate.send(msgCreator);
    }
    catch(JmsException | JMSException e){
        logger.error("Error when transfering message: '{}'. {}",message,e);
    }
}

アプリケーションは特定のエラーや警告なしで起動しますが、ソース キューにメッセージを入れるとすぐに、同じメッセージに対して onMessage メソッドが何度も実行されていることがログで確認できます。ロールバックして再起動します (エラーはどこにもスローされません)。

また、たまたま同じソース URL と宛先 URL (同じブローカーを意味しますが、それぞれに独自の connectionFactory があることを意味します) を使用すると、それが機能し、メッセージがソース キューと宛先キューの間で意図したとおりに転送されることにも気付きました。

私が疑問に思っているのは

  1. セットアップで何が間違っていますか? 2 つの異なるブローカーを使用しているのに、同じブローカーを使用している場合 (ただし 2 つの異なる接続ファクトリを使用している場合) にトランザクションが何度もロールバックされているように見えるのはなぜですか?
  2. 現在、すべての例外をキャッチして何もしていないため、onMessage が現在適切なトランザクションを行っているとは完全に確信していません。これにより、jmstemplate がメッセージの送信を完了する前に dmlc のトランザクションがコミットされると思いますが、確信が持てません。この場合、代わりに SessionAwareMessageListener の方がよいでしょうか? onMessage メソッドで @Transacted を設定する必要がありますか?

この問題に光を当てるのを手伝ってくれる人はいますか? すべての入力を歓迎します。

アップデート:

「ロールバック」の問題は、使用していた両方の AMQ がブローカーのネットワークを介して相互に接続されており、たまたまソースと宛先に同じキュー名を使用していたことが原因であることに気付きました。これにより、メッセージがアプリケーションによってある AMQ から別の AMQ に転送された後、すぐに、ソース AMQ にコンシューマーがあったため、メッセージは元の AMQ に転送され、元の AMQ として認識されました。アプリケーションによって新しいメッセージが送信され、再度転送され、サイクルは無限に続きました。以下に投稿された解決策は、他の問題に役立ちました。

4

1 に答える 1

0
try {
   ... Code
} catch (JmsException je) {
    logger.error("Error when transfering message: '{}'. {}",message,e);
}

上記のコードは例外を飲み込んでいます。トランザクション管理が適切に処理できるように、例外をキャッチしないか、再スローする必要があります。現在、例外は見られず、奇妙な結果につながる可能性があるコミットが実行されます。

私は次のようなことをします.SpringからのものでJmsExceptionあり、Springのほとんどの例外としてRuntimeException. また、例外スタックトレースを適切にログに記録する{}には、ログステートメントの2番目を削除します。

try {
   ... Code
} catch (JmsException je) {
    logger.error("Error when transfering message: '{}'.",message,e);
    throw je;
}

ただし、Spring もエラーをログに記録するため、これによりログが複製されます。

の場合、JMSExceptionこのようなことを行い、に変換しJmsExceptionます。

try {
   ... Code
} catch (JMSException je) {
    logger.error("Error when transfering message: '{}'.",message,e);
    throw JmsUtils.convertJmsAccessException(je);
}

何が起こったのかについてより多くの情報を得るには、おそらくorg.springframework.jmsパッケージの DEBUG ロギングを有効にする必要があります。これにより、メッセージの送受信時に何が起こるかについての洞察が得られます。

message.acknowledge()トランザクション セッションとメッセージの手動確認を使用するもう 1 つのことですが、コードでa を実行しません。JTA トランザクションのため、Spring はそれを呼び出しません。SESSION_TRANSACTED代わりに切り替えてみてください。少なくともDMLC.

于 2014-02-27T15:38:03.753 に答える