2

次のキャメルルートがあります:

<route id="myRoute">
    <from uri="direct:aggregator" />

    <aggregate strategy="aggregatorStrategy" completionInterval="60000" completionSize="500">
        <correlationExpression>
            <xpath>/fizz/buzz</xpath>
        </correlationExpression>

        <to uri="bean:postProcessor?method=run" />
    </aggregator>
</route>

ご覧のとおり、 が<aggregator/>受信した最初の 500 メッセージ、または 1 分間隔内のすべてのメッセージを集約し、集約したメッセージを という Bean に送信しますpostProcessor

この集計ロジックは、次のように考えることができます。

AGGREGATE UNTIL:
    We have received 500 messages
    OR
    1 minute has elapsed

THEN:
    Send to postProcessor

または疑似コードで: aggregateUntil(weHave500Message() || 1minHasElapsed()). このロジックを次のように変更したいと思います。

AGGREGATE UNTIL:
    We have received 500 messages
    OR
    1 minute has elapsed
    OR
    A message is received that has a property called "fireNow" and a value of "true"

THEN:
    Send to postProcessor

または、再び疑似コードで: aggregateUntil(weHave500Message() || 1minHasElapsed() || messageHasProperty("fireNow", "true")).

つまり、3 つの条件のいずれかが満たされるまで集計します。これを実装する方法はありますか?私はおそらく とでこれを仕上げることができると感じていますが、ここの木を通して森を見ることはできません.completionPredicateeagerCheckCompletion

4

1 に答える 1

4

completionSize次のように、と を一緒に使用して、式にcompletionIntervala を追加できます (集約されたメッセージではなく、受信メッセージをテストする必要があるため、使用する必要があります)。completionPredicateeagerCheckCompletion="true"

<route>
    <from uri="direct:aggregator" />
    <aggregate completionSize="500" completionInterval="60000" eagerCheckCompletion="true">
        <correlationExpression>
             <xpath>/fizz/buzz</xpath>
        </correlationExpression>
        <completion-predicate>
             <simple>${property.fireNow} == 'true'</simple>
        </completion-predicate>
        <to uri="bean:postProcessor?method=run" />
    </aggregate>
</route>

別の方法として、3 つの条件すべてをテストし、その述語のみを使用する複合述語を作成することもできますがcompletion-predicate、インターバル タイムアウトが別のスレッドによってトリガーされるため、このソリューションは劣りますが、複合述語を使用すると、集計をトリガーするスレッドがありません。タイムアウトの有効期限。

于 2014-02-06T00:28:06.410 に答える