2

jdbc に裏打ちされたcamel:aggregateを使用していますが、Exchange プロパティを保存していないようです。たとえば、次のルートを構成し、集約が完了したら実行を停止し、camel:to(log) を実行する直前に、再起動時にデータベースからデータを取得するように集約を強制すると、camel:to(log) は実行されません。プロパティmyPropertyを出力します

<camel:route id="myRoute">
    <camel:from uri="direct:in"/>

    <camel:setProperty propertyName="myProperty">
        <camel:constant>myPropertyValue</camel:constant>
    </camel:setProperty>

    <camel:aggregate strategyRef="myStrategy" aggregationRepositoryRef="myAggregationRepo" discardOnCompletionTimeout="true" completionTimeout="86400000" >
        <camel:correlationExpression>
            <camel:simple>${property.partlastcorrelationkey}</camel:simple>
        </camel:correlationExpression>
        <camel:completionPredicate>
            <camel:simple>${property.partlastcorrelationwaitmore} == false</camel:simple>
        </camel:completionPredicate>

        <camel:to uri="log:com.test?showAll=true"/>

    </camel:aggregate>
</camel:route>

私の集計リポジトリは次のように構成されています。

<bean id="myAggregationRepo" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository" init-method="start" destroy-method="stop">
    <property name="transactionManager" ref="transactionManager"/>
    <property name="repositoryName" value="PROC_AGG"/>
    <property name="dataSource" ref="oracle-ds"/>
    <property name="lobHandler">
        <bean class="org.springframework.jdbc.support.lob.OracleLobHandler">
            <property name="nativeJdbcExtractor">
                <bean class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/>
            </property>
        </bean>
    </property>
</bean>

Aggregator の使用時にプロパティを保存するにはどうすればよいですか?

4

1 に答える 1

2

私は自分自身に答えます。コードに見られるように、 JdbcCamelCodec は、データベースでアグリゲーターをバッキングするときにプロパティを保存することを許可しません:

public final class JdbcCamelCodec {
    public byte[] marshallExchange(CamelContext camelContext, Exchange exchange) throws IOException {
        // use DefaultExchangeHolder to marshal to a serialized object
        DefaultExchangeHolder pe = DefaultExchangeHolder.marshal(exchange, false);
        // add the aggregated size property as the only property we want to retain
        DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_SIZE, exchange.getProperty(Exchange.AGGREGATED_SIZE, Integer.class));
        // add the aggregated completed by property to retain
        DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_COMPLETED_BY, exchange.getProperty(Exchange.AGGREGATED_COMPLETED_BY, String.class));
        // add the aggregated correlation key property to retain
        DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_CORRELATION_KEY, exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class));
        // persist the from endpoint as well
        if (exchange.getFromEndpoint() != null) {
            DefaultExchangeHolder.addProperty(pe, "CamelAggregatedFromEndpoint", exchange.getFromEndpoint().getEndpointUri());
        }
    return encode(pe);
}

基本的に、問題はfalseが意味するこの行にあります: プロパティを保存しないでください。

DefaultExchangeHolder pe = DefaultExchangeHolder.marshal(exchange, false);

データベースに保存されるのは、ヘッダーと本文だけです。

于 2013-02-05T09:49:12.473 に答える