4

クラス@Retryableのメソッドでアノテーションを使用しています@Service

@Service
@EnableRetry 
public class PushService {

    @Retryable(maxAttempts=5)
    public Result pushIt(myMessage messageIn) {
        ...
    }
}

そしてそれは魅力のように機能します:私はRabbitMQから直接メッセージを受け取っています.エラーがなくなるか、試行回数が5に達するまで確認されず、その時点でメッセージは直接DLQに送られます.欲しかった。

私の唯一の問題は、プロパティ ファイルから maxAttempts を動的に設定する必要があることです。解決策はインターセプターを設定する必要がありますが、インターセプターがあるという唯一の事実は、たとえば次の場合にエラーを引き起こします。

@Service
@EnableRetry 
public class PushService {

    @Retryable(interceptor="myInterceptor") 
    public Result pushIt(myMessage messageIn) {
        ...
    }
}

myInterceptor は次のように定義されます。

@Bean
public StatefulRetryOperationsInterceptor myInterceptor() {
    return RetryInterceptorBuilder.stateful().maxAttempts(5).build();
}

次の例外を除いて、無限ループが発生します。

2015-04-08 07:12:10,970 GMT [SimpleAsyncTaskExecutor-1] (ConditionalRejectingErrorHandler.java:67) WARN  listener.ConditionalRejectingErrorHandler: Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:864)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:802)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:690)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:82)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:167)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1241)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1005)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:989)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:82)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1103)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
    at org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean$3.getKey(StatefulRetryOperationsInterceptorFactoryBean.java:103)
    at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor.invoke(StatefulRetryOperationsInterceptor.java:132)
    at org.springframework.retry.annotation.AnnotationAwareRetryOperationsInterceptor.invoke(AnnotationAwareRetryOperationsInterceptor.java:118)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:653)
    at com.acme.push.service.PushService$$EnhancerBySpringCGLIB$$9d503bc1.pushMessage(<generated>)
    at com.acme.push.receiver.PushListener.onMessage(PushListener.java:42)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:799)
    ... 10 more

私はそれを単純にしすぎていると確信していますが、このエラーの原因とその解決方法についての手がかりがありません。

4

2 に答える 2

1

@Retrayableアノテーションを使用せずに、必要な柔軟性を得ることができました。

RetryAdvice遅延と最大試行回数のパラメーターを使用してを作成しました。

@Bean
public MethodInterceptor retryAdvice() {
    FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
    backOffPolicy.setBackOffPeriod(delay);
    return RetryInterceptorBuilder.stateless().backOffPolicy(backOffPolicy)
            .maxAttempts(maxAttempts).build();
}

そして、アドバイスをのadviceChainに挿入しましたListenerContainer

@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(pushConnectionFactory());
    container.setQueues(pushQueue());
    container.setMessageListener(pushListener());

    Advice[] adviceChain = new Advice[] { retryAdvice() };
    container.setAdviceChain(adviceChain);

    return container;
}

このようにして、リスナーがスローするたびに

throw new AmqpRejectAndDontRequeueException(cause);

これにより、コンテナーは、指定された回数、必要な遅延で再試行します。その後、例外が伝播され、メッセージが DLQ で配信されます。

于 2015-04-09T10:16:45.780 に答える