0

私は Apache Camel を使用しており、1 行ずつ処理する必要がある入力用の大きなファイルを取得しています。コンテンツは既にソートされており、連続するすべての行を同じ相関キーで集約する必要があります。相関キーが変更された場合、前の集計を完了する必要があります。ファイルが終了すると、最後の集約も完了します。いくつかの制約があります: - 受信ファイルがかなり大きいため、ストリーミング方式で処理したいと考えています。・結果が同期エンドポイントに渡されるため、タイムアウト完了述語は使いたくない。そうしないと、データ ソースの消費速度を調整する背圧が失われ、交換がタイムアウト マップと AggregateProcessor の集約リポジトリに蓄積されます。

PreCompletionAwareAggregationStrategy は有望なソリューションのように見えますが、最後の集計は次のファイルが到着するまで完了しないことが判明しました。preComplete で CamelSplitComplete プロパティを使用すると、最後の集約は完了しますが、最後の着信交換はありません。代わりに、この最後の交換は、次に到着するファイルのコンテンツに追加されます。

そのため、現在、過度に醜くない解決策を見つけるのにかなり迷っています。

4

2 に答える 2

0

説明したシナリオでは、分割されたメッセージをアグリゲーター (「AggregationRoute」と呼びましょう) を使用してルートに送信し、そのアグリゲーション戦略は PreCompletionAwareAggregationStrategy を実装します (既に使用されている方法だと思います)。そして分割が終わったらAGGREGATION_COMPLETE_ALL_GROUPSヘッダをtrueにしてAggregationRouteに送る。この交換は、すべての集約グループを完了するためのシグナルとしてのみ使用されます。

例:


    ...
    .split(body()).streaming()
        .to("direct:aggregationRoute")
    .end()
    .setHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS,constant(true))
    .to("direct:aggregationRoute");

from("direct:aggregationRoute")
    .aggregate([your correlation expression]), myAggregationStrategy)
    ...

もう 1 つの方法は、AggregateController を使用して、そのメソッド forceCompletionOfAllGroups() を呼び出してすべてのグループの集約を終了することです。


AggregateController aggregateController = new DefaultAggregateController();

from(...)
    ...
    .split(body()).streaming()
        .aggregate([correlation expression], aggregationStrategy).aggregateController(aggregateController)
            ...
            // Do what you need to do with the aggregated exchange
            ...
        .end()
    .end()
    .bean(aggregateController, "forceCompletionOfAllGroups")
于 2016-11-17T22:07:48.357 に答える
0

データは既にソートされているため、ストリーミング方式で解析し、同じcorrelationkeyを持つ各行をハッシュマップ構造に追加することが、おそらく1つの方法です。新しいcorrelationkeyに遭遇したら、基本的にハッシュマップを「フラッシュ」して新しいメッセージを作成し、同じプロセスを再開する必要があります。ここを見てください: http://camel.apache.org/how-do-i-write-a-custom-processor-which-sends-multiple-messages.html

于 2016-11-12T02:33:22.387 に答える