1

連続する市場データの集計ビューを生成しようとしています。つまり、2 つのメッセージごとに合計値を計算する必要があります。入ってくるデータを次のように言います。

(V0,T0),(V1,T1),(V2,T2),(V3,T3)....

VTは、データを受信したときのタイムスタンプを意味します。

2 ポイントごとに合計を生成する必要があります。

(R1=Sum(V0,V1),T1),(R2=Sum(V1,V2),T2),(R3=Sum(V2,V3),T3),....

aggregator2を使用してこれを行う方法、またはこのためのプロセッサを作成する必要があるという提案はありますか?

4

2 に答える 2

1

そうです、aggregator2コンポーネントは良い方法です。私はそのようなことを試みます:

from("somewhere").split(body().tokenize("),")).streaming()
    .aggregate(new ValueAggregationStrategy()).completionTimeout(1500)
    .to("whatYouWant");

class ValueAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (oldExchange == null) {
            return newExchange;
        }

        String oldBody = oldExchange.getIn().getBody(String.class);
        String newBody = newExchange.getIn().getBody(String.class);

        oldExchange.getIn().setBody(extractValue(oldBody) + extractValue(newBody));
        return oldExchange;
    }

    public int extractValue(String body) {
        // Do the work "(V0,T0" -> "V0"
    }
}

注意: 次のような形式があれば、解析が容易になります。V0,T0;V1,T1...

詳細について は、Claus Ibsen が書いた Camel での大きなファイルの解析に関する記事を参照してください。

于 2013-11-05T20:54:46.970 に答える
1

Aggregator のソース コードを読んだ後、camel は 1 つのメッセージを 1 つのグループに集約するだけであることがわかりました。この目的のために「アグリゲータ」を構築する必要があります。コードは次のとおりです。

public abstract class GroupingGenerator<I> implements Processor {
private final EvictingQueue<I> queue;
private final int size;

public int getSize() {
    return size;
}

public GroupingGenerator(int size) {
    super();
    this.size = size;
    this.queue = EvictingQueue.create(size);
}

@SuppressWarnings("unchecked")
@Override
public void process(Exchange exchange) throws Exception {
    queue.offer((I) exchange.getIn().getBody());
    if (queue.size() != size) {
        exchange.setProperty(Exchange.ROUTE_STOP, true);
        return;
    } else {
        processGroup(queue, exchange);
    }
}

protected abstract void processGroup(Collection<I> items, Exchange exchange);

}
于 2013-11-14T14:40:01.783 に答える