1

camel と activeMq を使用した再試行メカニズムに取り組んでいます。私がやりたいことは、呼び出しているサーバーの 1 つがダウンしている場合に再試行メカニズムを開始し、要求をキューに追加して、時間ごとにサーバーに再送信することです。無視されているように見える再試行ポリシーを除いて、すべて正常に機能します (私の要求はキューに入るときに再送信され、再試行回数に達した後は DLQ に追加されません)

私の構成は次のようになります (値は .cfg ファイルから読み取られます)。

<beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="${activemq.broker.url}"/>
    <property name="userName" value="${activemq.broker.username}"/>
    <property name="password" value="${activemq.broker.password}"/>
    <property name="redeliveryPolicy" ref="policy"/>
</bean>

<bean id="policy" class="org.apache.activemq.RedeliveryPolicy">
    <property name="queue" value="*"/>
    <property name="initialRedeliveryDelay" value="${activemq.redelivery.delay}"/>
    <property name="redeliveryDelay" value="${activemq.redelivery.delay}"/>
    <property name="useExponentialBackOff" value="false"/>
    <property name="maximumRedeliveries" value="${activemq.number.of.redeliveries}"/>
</bean>

<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
    <property name="maxConnections" value="8"/>
    <property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>

<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
    <property name="connectionFactory" ref="pooledConnectionFactory"/>
</bean>

<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="pooledConnectionFactory"/>
    <property name="transactionManager" ref="jmsTransactionManager"/>
    <property name="transacted" value="true"/>
    <property name="cacheLevelName" value="CACHE_CONSUMER"/>
    <property name="concurrentConsumers" value="8"/>
</bean>

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="configuration" ref="jmsConfig"/>
</bean>

私のサービスを呼び出すキャメルルート:

<route id="addRegistrationRoute">
        <from uri="direct:addRegistrationRoute"/>

        <to uri="cxf:bean:addRegistrationEndpoint"/>

        <onException>
            <exception>java.net.ConnectException</exception>
            <onWhen>
                <el>${in.headers['previousRoute'] != 'registrationRetryRoute'}</el>
            </onWhen>
            <handled>
                <constant>true</constant>
            </handled>

            <setBody>
                <simple>${headers.request}</simple>
            </setBody>
            <removeHeaders pattern="*"/>
            <to uri="activemq:queue:registrationRetryQueue"/>
            <stop/>
        </onException>
        <onException>
            <exception>org.apache.cxf.interceptor.Fault</exception>
            <onWhen>
                <el>${in.headers['previousRoute'] != 'registrationRetryRoute'}</el>
            </onWhen>
            <handled>
                <constant>true</constant>
            </handled>

            <setBody>
                <simple>${headers.request}</simple>
            </setBody>
            <removeHeaders pattern="*"/>

            <to uri="activemq:queue:registrationRetryQueue"/>
            <stop/>
        </onException>
        <onException>
            <exception>javax.xml.soap.SOAPFault</exception>
            <handled>
                <constant>true</constant>
            </handled>

        </onException>
    </route>
    <route id="registrationRetryRoute">
        <from uri="activemq:queue:registrationRetryQueue"/>

        <setHeader headerName="previousRoute">
            <simple>registrationRetryRoute</simple>
        </setHeader>
        <to uri="direct:addRegistrationRoute"/> <!-- Back to the initial flow. -->
    </route>

activeMq の設定で何が間違っていたのか誰か教えていただければ、本当に感謝しています!

よろしく、ロクサーナ

4

1 に答える 1

0

ActiveMQ は、コンシューマー (またはメッセージ リスナー) からエラーをスローしている場合にのみ、メッセージを DLQ にプッシュします。そのため、例外をキャッチしてスローバックし、それに応じて再試行ポリシーが有効になるようにしてください。

于 2014-12-20T17:08:20.227 に答える