2

Spring-AMQP で Transactionnal RabbitMQ チャネルを使用しようとしていますが、実際に例外を飲み込んでログに記録し、それらを回復できるようにしたいと考えています。

channelTransacted=true を使用すると、Channel が強制的に現在の transactionManager (私の場合は Hibernate) にも参加するようになり、その結果、commit Exception が @Transactionnal 境界から再スローされ、上位レベルでエラーが発生し、それをキャッチしてログに記録することができなくなります。それ。

また、パブリッシュをトランザクションに手動でアタッチして、コミットが成功した後にのみ実行されるようにしました。

public void publishFailSafeAfterSuccessfulTransaction(final String routingKey, final String message) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
        @Override
        public void afterCommit() {
            try {
                rabbitTemplate.convertAndSend(routingKey, message);
            } catch (Exception exception) {
                logger.error("Error while publishing message to RabbitMQ ");
            }
        }
});

そのように使用されます:

Entity entity = save(entity);
publishFailSafeAfterSuccessfulTransaction("routingkey", "Entity was updated");

しかし、その場合、channelTransacted=true を使用することはできません。これは、registeringSynchronization を別の registeringSynchronization 内にネストし、まったく呼び出されないためです...

これを達成する方法はありますか?

更新: 理想的には、 ConnectionFactoryUtils クラスで使用される RabbitResourceSynchronization をオーバーライドしたいのですが、これはファクトリがインスタンス化されていないプライベート クラスです。

TransactionSynchronizationManager.registerSynchronization(new RabbitResourceSynchronization(resourceHolder, connectionFactory, synched));
4

1 に答える 1

1

私が実装した解決策は、メイン トランザクションのコミット後に新しいトランザクション内で発行を行うことでした。

最初の呼び出し:

Entity entity = save(entity);
publishFailSafeAfterSuccessfulTransaction("routingkey", "Entity was updated");

このメソッドは、メイン トランザクションがコミットされた後に発行を行うように登録します。

public void publishFailSafeAfterSuccessfulTransaction(final String routingKey, final String event) {

    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
        @Override
        public void afterCommit() {
            try {
                publishFailSafe(routingKey, event);
            } catch (Exception exception) {
                //Do some recovering
            }
        }
    });
}

メインのトランザクションがコミットされた後、これがパブリッシュを行います。チャネルが処理されると、その新しいトランザクションのコミットでメッセージがコミットされ、そのトランザクションのみが失敗し、エラーは前のメソッドでキャッチされます。

@Transactional(propagation = Propagation.REQUIRES_NEW)
public void publishFailSafe(String routingKey, String event) {
    try {
        rabbitTemplate.convertAndSend(routingKey.getRoutingKey(), event);
    } catch (Exception exception) {
        //Do some recovering
    }
}
于 2015-03-23T10:45:08.440 に答える