1

ルート(Apache Camel の言い回し) についての私の理解では、ルートはあるエンドポイントから別のエンドポイントへのデータの流れを表し、途中でデータに対して EIP タイプの操作を実行するさまざまなプロセッサで停止します。

それがルートの正しい/公正な評価である場合、同じ内部に複数のルートが必要であると思わCamelContextれる問題をモデル化しています(私はSpringを使用しています):

  1. ルート 1: Source-1 からデータを抽出して処理し、に変換してList<SomePOJO>から、アグリゲーターに送信します。
  2. ルート 2: Source-2 からデータを抽出して処理し、さらに に変換してからList<SomePOJO>アグリゲーターに送信します。
  3. ルート 3:ルート 1 とルート 2 の両方List<SomePOJO>からを受信するまで待機し、その後、集約リストの処理を続行するアグリゲーターが含まれています。

ここに問題があります:両方List<SomePOJO>の s が同時にアグリゲーターに到着する必要があります。または、アグリゲーター Bean は、2 つのリストを 1 つに集約して集約されたリストを送信する前に、両方のルートからデータを受信するまで待機List<SomePOJO>する必要があります国道3号線の残り。

これまでのところ、次の疑似コードがあります<camelContext>

<camelContext id="my-routes" xmlns="http://camel.apache.org/schema/spring">
    <!-- Route 1 -->
    <route id="route-1">
        <from uri="time://runOnce?repeatCount=1&amp;delay=10" />

        <!-- Extracts data from Source 1, processes it, and then produces a List<SomePOJO>. -->
        <to uri="bean:extractor1?method=process" />

        <!-- Send to aggregator. -->
        <to uri="direct:aggregator" />
    </route>

    <!-- Route 2 -->
    <route id="route-2">
        <from uri="time://runOnce?repeatCount=1&amp;delay=10" />

        <!-- Extracts data from Source 2, processes it, and then produces a List<SomePOJO>. -->
        <to uri="bean:extractor2?method=process" />

        <!-- Send to aggregator. -->
        <to uri="direct:aggregator" />
    </route>

    <!-- Route 3 -->
    <route id="route-3">
        <from uri="direct:aggregator" />

        <aggregate strategyRef="listAggregatorStrategy">
            <correlationExpression>
                <!-- Haven't figured this part out yet. -->
            </correlationExpression>
            <to uri="bean:lastProcessor?method=process" />
        </aggregate>
    </route>
</camelContext>

<bean id="listAggregatorStrategy" class="com.myapp.ListAggregatorStrategy" />

次にJavaで:

public class ListAggregatorStrategy implements AggregatoryStrategy {
    public Exchange aggregate(Exchange exchange) {
        List<SomePOJO> route1POJOs = extractRoute1POJOs(exchange);
        List<SomePOJO> route2POJOs = extractRoute2POJOs(exchange);

        List<SomePOJO> aggregateList = new ArrayList<SomePOJO>(route1POJOs);
        aggregateList.addAll(route2POJOs);

        return aggregateList;
    }
}

私の質問

  1. 私の基本設定は正しいですか?つまり、direct:aggregatorエンドポイントを正しく使用して、のアグリゲーターroute-1との間route-2でデータを送受信していますか?route-3
  2. アグリゲーターは期待どおりに機能しますか? Beanの実行には 5 秒しかかからないが、extractor1Beanの実行には 2 分かかるとします。t=5 で、アグリゲーターは からデータを受信し、終了するまで (2 分間) 待機を開始し、残りのデータを集約する必要があります。はい?route-1extractor2route-2extractor1extractor2
4

1 に答える 1

0

正しい方向に進んでいるようですね。Aggregatorページには、これに関する多くの優れた情報があります。

これ<correlationExpression>は、各ルートから Exchange を照合するための鍵であり、completeSize は待機する数を指定できます。あなたの場合、各ルートは 1 回だけ実行するように設計されているように見えます。その場合、式は各 Exchange からの固定ヘッダー値を使用している可能性があります。それ以外の場合は、ルートごとにカウンター クラスのようなものが必要になります。

例の更新は次のとおりです。

<route id="route-1">
    <from uri="time://runOnce?repeatCount=1&amp;delay=10" />
    <to uri="bean:extractor1?method=process" />
    <setHeader headerName="id">
        <constant>myHeaderValue</constant>
    </setHeader>
    <to uri="direct:aggregator" />
</route>

<route id="route-2">
    <from uri="time://runOnce?repeatCount=1&amp;delay=10" />
    <to uri="bean:extractor2?method=process" />
    <setHeader headerName="id">
        <constant>myHeaderValue</constant>
    </setHeader>
    <to uri="direct:aggregator" />
</route>

<route id="route-3">
    <from uri="direct:aggregator" />

    <aggregate strategyRef="listAggregatorStrategy" completionSize="2">
        <correlationExpression>
            <simple>header.id</simple>
        </correlationExpression>
        <to uri="bean:lastProcessor?method=process" />
    </aggregate>
</route>
于 2013-12-13T19:54:51.280 に答える