2 つの異なるリモートの activeMQ ブローカー間で jms メッセージを移動しようとしています。
私はスタンドアロン アプリケーションを作成しているため、Atomikos を使用しています。また、Spring を使用してすべてを機能させています。
次のBean javaconfigセットアップがあります
@Bean(name="atomikosSrcConnectionFactory")
public AtomikosConnectionFactoryBean consumerXAConnectionFactory() {
AtomikosConnectionFactoryBean consumerBean = new AtomikosConnectionFactoryBean();
consumerBean.setUniqueResourceName("atomikosSrcConnectionFactory");
consumerBean.setLocalTransactionMode(false);
return consumerBean;
}
@Bean(name="atomikosDstConnectionFactory")
public AtomikosConnectionFactoryBean producerXAConnectionFactory() {
AtomikosConnectionFactoryBean producerBean = new AtomikosConnectionFactoryBean();
producerBean.setUniqueResourceName("atomikosDstConnectionFactory");
producerBean.setLocalTransactionMode(false);
return producerBean;
}
@Bean(name="jtaTransactionManager")
public JtaTransactionManager jtaTransactionManager() throws SystemException {
JtaTransactionManager jtaTM = new JtaTransactionManager();
jtaTM.setTransactionManager(userTransactionManager());
jtaTM.setUserTransaction(userTransactionImp());
return jtaTM;
}
@Bean(initMethod="init", destroyMethod="close", name="userTransactionManager")
public UserTransactionManager userTransactionManager() {
UserTransactionManager utm = new UserTransactionManager();
utm.setForceShutdown(false);
return utm;
}
@Bean(name="userTransactionImp")
public UserTransactionImp userTransactionImp() throws SystemException {
UserTransactionImp uti = new UserTransactionImp();
uti.setTransactionTimeout(300);
return uti;
}
@Bean(name="jmsContainer")
@Lazy(value=true)
public DefaultMessageListenerContainer jmsContainer() throws SystemException {
DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
dmlc.setAutoStartup(false);
dmlc.setTransactionManager(jtaTransactionManager());
dmlc.setSessionTransacted(true);
dmlc.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
dmlc.setConnectionFactory(consumerXAConnectionFactory());
dmlc.setDestinationName("srcQueue");
return dmlc;
}
@Bean(name="transactedJmsTemplate")
public JmsTemplate transactedJmsTemplate() {
DynamicDestinationResolver dest = new DynamicDestinationResolver();
JmsTemplate jmsTmp = new JmsTemplate(producerXAConnectionFactory());
jmsTmp.setDeliveryPersistent(true);
jmsTmp.setSessionTransacted(true);
jmsTmp.setDestinationResolver(dest);
jmsTmp.setPubSubDomain(false);
jmsTmp.setReceiveTimeout(20000);
jmsTmp.setExplicitQosEnabled(true);
jmsTmp.setSessionTransacted(true);
jmsTmp.setDefaultDestination(new ActiveMQQueue("destQueue"));
jmsTmp.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return jmsTmp;
}
DMLC を開始する前に、実行時に 2 つの AtomikosConnectionFactoryBean が ActiveMQXAConnectionFactory (ブローカーごとに 1 つ) をラップしています。
次に、次のメソッドを使用して、簡単な messageListener (開始前に dmlc に割り当てられます) をセットアップします。
public void onMessage(Message message) {
final Message rcvedMsg = message;
try{
MessageCreator msgCreator = new MessageCreator(){
public Message createMessage(Session session) throws JMSException{
Message returnMsg = null;
if(rcvedMsg instanceof TextMessage){
TextMessage txtMsg = session.createTextMessage();
txtMsg.setText(((TextMessage) rcvedMsg).getText());
returnMsg = txtMsg;
}
else if(rcvedMsg instanceof BytesMessage){
BytesMessage bytesMsg = session.createBytesMessage();
if(!(((BytesMessage) rcvedMsg).getBodyLength() > Integer.MAX_VALUE)){
byte[] bodyContent = new byte[(int) ((BytesMessage) rcvedMsg).getBodyLength()];
bytesMsg.writeBytes(bodyContent);
returnMsg = bytesMsg;
}
}
return returnMsg;
}
};
jmsTemplate.send(msgCreator);
}
catch(JmsException | JMSException e){
logger.error("Error when transfering message: '{}'. {}",message,e);
}
}
アプリケーションは特定のエラーや警告なしで起動しますが、ソース キューにメッセージを入れるとすぐに、同じメッセージに対して onMessage メソッドが何度も実行されていることがログで確認できます。ロールバックして再起動します (エラーはどこにもスローされません)。
また、たまたま同じソース URL と宛先 URL (同じブローカーを意味しますが、それぞれに独自の connectionFactory があることを意味します) を使用すると、それが機能し、メッセージがソース キューと宛先キューの間で意図したとおりに転送されることにも気付きました。
私が疑問に思っているのは
- セットアップで何が間違っていますか? 2 つの異なるブローカーを使用しているのに、同じブローカーを使用している場合 (ただし 2 つの異なる接続ファクトリを使用している場合) にトランザクションが何度もロールバックされているように見えるのはなぜですか?
- 現在、すべての例外をキャッチして何もしていないため、onMessage が現在適切なトランザクションを行っているとは完全に確信していません。これにより、jmstemplate がメッセージの送信を完了する前に dmlc のトランザクションがコミットされると思いますが、確信が持てません。この場合、代わりに SessionAwareMessageListener の方がよいでしょうか? onMessage メソッドで @Transacted を設定する必要がありますか?
この問題に光を当てるのを手伝ってくれる人はいますか? すべての入力を歓迎します。
アップデート:
「ロールバック」の問題は、使用していた両方の AMQ がブローカーのネットワークを介して相互に接続されており、たまたまソースと宛先に同じキュー名を使用していたことが原因であることに気付きました。これにより、メッセージがアプリケーションによってある AMQ から別の AMQ に転送された後、すぐに、ソース AMQ にコンシューマーがあったため、メッセージは元の AMQ に転送され、元の AMQ として認識されました。アプリケーションによって新しいメッセージが送信され、再度転送され、サイクルは無限に続きました。以下に投稿された解決策は、他の問題に役立ちました。