2

アグリゲーター内のすべてのアグリゲーションに対して、フラッシュまたはcompletionSizeが効果的に必要です。グローバルcompletionSizeのように。

基本的に、バッチで受信されるすべてのメッセージが集約され、最後のメッセージが読み取られたときに、そのアグリゲーター内のすべての集約が一度に完了するようにします。

e.g. 1000 messages arrive (the length is not known beforehand)

     aggregate on correlation id into bins
        A 300
        B 400
        C 300   (size of the bins is not known before hand)

     I want the aggregator not to complete until the 1000th exchange is aggregated
     thereupon I want all of the aggregations in the aggregator to complete at once

CompleteSizeは各アグリゲーションに適用され、残念ながらアグリゲーター全体には適用されません。したがって、CompleteSize(1000)を設定すると、各集計が「完了する」前に1000を超える必要があるため、終了することはありません。

単一のMapオブジェクトを作成することで回避できますが、これはaggregator2の相関関係を回避するものであり、理想的には使用したいと思います。

そうですね、グローバルコンプリートサイズまたはフラッシングのいずれかで、これをインテリジェントに行う方法はありますか?

4

3 に答える 3

3

1つのオプションは、グローバルカウンターを保持するロジックを追加しExchange.AGGREGATION_COMPLETE_ALL_GROUPS、到達したらヘッダーを設定することです...

Camel 2.9以降で使用可能...ヘッダーExchange.AGGREGATION_COMPLETE_ALL_GROUPSがtrueに設定されているメッセージを送信することにより、現在の集約されたすべての交換を手動で完了することができます。メッセージはシグナルメッセージのみと見なされ、メッセージヘッダー/コンテンツはそれ以外の場合は処理されません。

于 2013-02-25T16:24:30.497 に答える
0

Camel aggregator eip doc http://camel.apache.org/aggregator2を見て、さまざまな完了条件について読むことをお勧めします。また、Benが参照する特別なメッセージは、すべての実行中の集計を完了するためにシグナルに送信できます。

バッチコンシューマーhttp://camel.apache.org/batch-consumer.htmlから消費する場合は、バッチが完了したときに完了する特別な補完を使用できます。たとえば、JPAデータベーステーブルなどからファイルまたは行を取得する場合。バッチコンシューマからのすべてのメッセージが処理されると、アグリゲータは、completionFromBatchConsumerオプションを使用して、これらすべての集約メッセージの完了を通知できます。

また、Camel in Actionの本のコピーをお持ちの場合は、第8章のセクション8.2をお読みください。これは、EIPの集約に関するすべてがより詳細に説明されているためです。

于 2013-02-26T15:51:15.677 に答える
0

Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVEを使用すると、次のように機能しました。

from(endpoint)
        .unmarshal(csvFormat)
        .split(body())
        .bean(CsvProcessor())
        .choice()
        // If all messages are processed,
        // flush the aggregation
        .`when`(simple("\${property.CamelSplitComplete}"))
        .setHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, constant(true))
        .end()
        .aggregate(simple("\${body.trackingKey}"),
                AggregationStrategies.bean(OrderAggregationStrategy()))
        .completionTimeout(10000)
于 2019-09-07T14:40:46.627 に答える