1

私は、Spring Integration フレームワークと分散トランザクション用の atomikos を使用して構築されたプロジェクトに取り組んでいます。最近、統合テストを実行して、メッセージがシステムを介して正しく送信されていることを確認しようとしています。これらの統合テストの 1 つを実行すると、新しいトランザクションが作成されていることを示す 10 個のログ メッセージと、トランザクションのコミットを示す 10 個のログ メッセージが表示されることに気付きました。

メッセージがチャネルからエンドポイントに、またはその逆に渡されるたびに、Spring は新しいトランザクションを作成しますか?

以下のコードは、(transactionManager を使用して) message-driven-channel-adapter でメッセージを受け取り、ルーターに送信します。次に、ルーターは、トランスフォーマー、サービス アクティベーター (retryAdvice を使用)、およびアウトバウンド メッセージ アダプターを含むチェーンにメッセージを送信します。

私の知る限り、メッセージ駆動型チャネル アダプターがキューからメッセージを受信すると 1 つのトランザクションが作成され、メッセージの処理が完了するとコミットが行われるはずです。したがって、合計で 3 つのトランザクションがあると思いました。テストでのメッセージの送信から 1 つ、インバウンド アダプターから 1 つ、テストでのメッセージの受信から 1 つです。

spring-datasource.xml から

<bean id="transactionManager"
      class="org.springframework.transaction.jta.JtaTransactionManager">
    <property name="transactionManager" ref="AtomikosTransactionManager" />
    <property name="userTransaction" ref="AtomikosUserTransaction" />
</bean>

spring-context.xml から

<jms:message-driven-channel-adapter
        id="inventoryQueue_inbound_adapter"
        destination-name="queue.inventory"
        channel="InventoryRouterChannel"
        error-channel="inventoryErrorChannel"
        transaction-manager="transactionManager"
        acknowledge="transacted"/>

<integration:router input-channel="InventoryRouterChannel">
    <bean class="com.inventory.InventoryRouter"/>
</integration:router>

<integration:chain input-channel="rxAddToCountWell">
    <integration:json-to-object-transformer type="com.events.RxAddToCountWell"/>
    <integration:service-activator ref="addToCountWellHandler" method="formatCountwellMessage">
        <integration:request-handler-advice-chain>
            <ref bean="retryAdvice"/>
        </integration:request-handler-advice-chain>
    </integration:service-activator>
    <jms:outbound-channel-adapter destination-name="OUTBOUND.QUEUE"/>
</integration:chain>

<bean id="retryAdvice" class="org.springframework.integration.handler.advice.RequestHandlerRetryAdvice">
    <property name="recoveryCallback">
        <bean class="org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer">
            <constructor-arg ref="inventoryErrorChannel"/>
        </bean>
    </property>
    <property name="retryTemplate">
        <bean class="org.springframework.retry.support.RetryTemplate">
            <property name="retryPolicy">
                <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
                    <property name="maxAttempts" value="2"/>
                </bean>
            </property>
            <property name="backOffPolicy">
                <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
                    <property name="initialInterval" value="1000"/>
                    <property name="multiplier" value="2"/>
                </bean>
            </property>
        </bean>
    </property>
</bean>

メッセージエンドポイント

@MessageEndpoint
public class AddToCountWellHandler {

    public static final Logger logger = Logger.getLogger(AddToCountWellHandler.class);

    public Message<String> formatCountwellMessage(RxAddToCountWell payload) {
        //our logic here
        //...
        return MessageBuilder.withPayload(temp).build();
    }
}

試験方法

@Test
@DirtiesContext
public void addToCountWellIntegrationTest() throws InterruptedException, SystemException, NotSupportedException, HeuristicRollbackException, HeuristicMixedException, RollbackException {

    // Send the message to the handler
    transactionManager.getTransactionManager().begin();
    jmsTemplate.send("queue.inventory", new MessageCreator() {
        @Override
        public Message createMessage(Session session) throws JMSException {
            TextMessage message;
            try {
                message = session.createTextMessage(getJson("./doc/event_json_examples/inventory/rxAddToCountWell.json"));
            } catch (Exception e) {
                //...
            }
            message.setJMSType("rxAddToCountWell");
            return message;
        }
    });
    transactionManager.getTransactionManager().commit();

    transactionManager.getTransactionManager().begin();

    //verify that it was placed on the queue
    TextMessage output = (TextMessage) jmsTemplate.receive(COUNTWELL_QUEUE_NAME);
    assertNotNull(output);

    transactionManager.getTransactionManager().commit();
    appContext.close();
}

ログ

2014-04-08 13:55:35,018 [INFO ] org.springframework.transaction.jta.JtaTransactionManager - Using JTA UserTransaction: com.atomikos.icatch.jta.UserTransactionImp@188c838 (org.springframework.transaction.jta.JtaTransactionManager.checkUserTransactionAndTransactionManager(JtaTransactionManager.java:471))
2014-04-08 13:55:35,019 [INFO ] org.springframework.transaction.jta.JtaTransactionManager - Using JTA TransactionManager: com.atomikos.icatch.jta.UserTransactionManager@111089b (org.springframework.transaction.jta.JtaTransactionManager.checkUserTransactionAndTransactionManager(JtaTransactionManager.java:482))
2014-04-08 13:55:36,172 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#2]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:36,171 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#0]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:36,172 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#4]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:36,172 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#3]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:36,171 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#1]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:37,619 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:37,631 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#2]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:38,656 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:38,658 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#0]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:39,667 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:39,670 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#4]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:40,705 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:40,706 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#1]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:40,748 [DEBUG] com.inventory.InventoryRouter - Routing jms_type: rxAddToCountWell (com.inventory.InventoryRouter.route(InventoryRouter.java:25))
2014-04-08 13:55:40,834 [DEBUG] org.springframework.retry.support.RetryTemplate - Retry: count=0 (org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:251))
2014-04-08 13:55:40,835 [DEBUG] com.inventory.AddToCountWellHandler - Entered formatCountwellMessage... (com.inventory.AddToCountWellHandler.formatCountwellMessage(AddToCountWellHandler.java:31))
2014-04-08 13:55:41,179 [DEBUG] com.inventory.AddToCountWellHandler - Leaving formatCountwellMessage (com.inventory.AddToCountWellHandler.formatCountwellMessage(AddToCountWellHandler.java:38))
2014-04-08 13:55:41,190 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:41,193 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#3]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:43,396 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:44,406 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:45,422 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:46,448 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:47,453 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
4

2 に答える 2

0

構成と一致しませんが、5 つのリスナー スレッドがあるようです ( #0..#4)。そのうちの 4 つは、メッセージの処理中に no-op トランザクション (受信するメッセージがない) を実行#3します。アイドル スレッドは、「場合に備えて」メッセージを受信する前にトランザクションを開始する必要があります。

これらのスレッドでは作業が行われていないため、コミットは軽量である必要があります。受信タイムアウトを増やしてこれを最小限に抑えることができますが、コンテナーのシャットダウンに時間がかかる可能性があります (JMS クライアント ライブラリでスレッドがブロックされるため)。

デフォルトreceiveTimeoutは 1 秒です。

于 2014-04-08T20:03:39.580 に答える