1

サーバーにファイルを送信する複数のクライアントがあります。1 つのデータ セットに対して、そのデータに関する情報を含む 2 つのファイルがあり、それぞれが同じ名前です。ファイルが受信されると、サーバーは、ファイル パス、ファイル名、クライアントの ID、およびファイルの「タイプ」を含むメッセージをキューに送信します (すべて同じファイル拡張子を持ちますが、「タイプ」が 2 つあります。 "それらを A と B と呼びます)。

1 組のデータの 2 つのファイルは同じファイル名です。サーバーが両方のファイルを受信したらすぐに、2 つのファイルを結合するプログラムを開始する必要があります。現在、私は次のようなものを持っています:

from("jms:queue.name").aggregate(header("CamelFileName")).completionSize(2).to("exec://FILEPATH?args=");

私が立ち往生しているのは、ヘッダー( "CamelFileName")、およびより具体的にはアグリゲーターの仕組みです。

completionSize を 2 に設定すると、最初のメッセージと一致する 2 番目のメッセージが送信されるまで、すべてのメッセージを吸い上げて何らかのデータ構造に格納するだけですか? また、header() は特定の値を期待していますか? 私は複数のクライアントを持っているので、クライアント ID とファイル名をヘッダーに含めることを考えていましたが、特定の値を指定する必要があるかどうかはわかりません。また、正規表現を使用できるかどうかもわかりません。

アイデアやヒントは非常に役立ちます。ありがとう

編集:ここに私が今持っているコードがあります。ここでの問題の説明と、選択した回答へのコメントに基づいて、それは正確に見えますか (私がコピーしなかった閉じ括弧を除く)?

public static void main(String args[]) throws Exception{
        CamelContext c = new DefaultCamelContext();
        c.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));
        //ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
        //c.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
        c.addRoutes(new RouteBuilder() {
            public void configure() {
                from("activemq:queue:analytics.camelqueue").aggregate(new MyAggregationStrategy()).header("subject").completionSize(2).to("activemq:queue:analytics.success");
            }
        });
        c.start();
        while (true) {
            System.out.println("Waiting on messages to come through for camel");
            Thread.sleep(2 * 1000);
        }
        //c.stop();
    }

    private static class MyAggregationStrategy implements AggregationStrategy {

        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            if (oldExchange == null)
                return newExchange;
            // and here is where combo stuff goes
            String oldBody = oldExchange.getIn().getBody(String.class);
            String newBody = newExchange.getIn().getBody(String.class);
            boolean oldSet = oldBody.contains("set");
            boolean newSet = newBody.contains("set");
            boolean oldFlow = oldBody.contains("flow");
            boolean newFlow = newBody.contains("flow");
            if ( (oldSet && newFlow) || (oldFlow && newSet) ) {
                //they match so return new exchange with info so extractor can be started with exec
                String combined = oldBody + "\n" + newBody + "\n";
                newExchange.getIn().setBody(combined);
                return newExchange;
            }
            else {
                // no match so do something....
                return null;
            }
        }
    }
4

1 に答える 1

3

交換を結合する方法を定義するには、AggregationStrategy を指定する必要があります...

fileName のみに関心があり、正確に 2 つの Exchange を受信する場合は、UseLatestAggregationStrategy を使用して、2 つが「集約」されたら最新の Exchange を渡すことができます...

そうは言っても、両方の Exchange (clientId ごとに 1 つ) を保持する必要があるように思われるので、その情報を「exec」ステップに渡すことができます。 -オプションを介して有効化された集計戦略でgroupExchanges...または、カスタム AggregationStrategy を指定して、必要に応じてそれらを組み合わせます。「exec」ステップでは、使用することにした集約構造を処理する必要があることに注意してください...

例については、次の単体テストを参照してください。

https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorTest.java

https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java

于 2013-06-21T17:56:04.017 に答える