0

こんにちは、apache camel または spring 統合を活用して、プロセス金融市場データをストリーミングして取引シグナルを計算しようとしています。使用例の 1 つは、次のように、価格のタイムスタンプに基づいて連続する価格を集計することです。

  • 入力

入力メッセージは、時系列の (timestamp,price) ペアとして提供されます。入ってくる値を次のように仮定します。各ペア (TX、PX) はメッセージであり、T はタイムスタンプ、P は価格値です。

(T0,P1),(T1,P1),(T2,P2),(T3,P3),(T4,P4)... 
  • 集計

次のグループを作成する必要がある入力メッセージが与えられた場合、3 つの連続したメッセージごとに、さらに計算するために一緒に集約する必要があるとします。各 3 ペアのグループは集約されたメッセージです。

[(T0,P1),(T1,P1),(T2,P2)],
[(T1,P1),(T2,P2),(T3,P3)],
[(T2,P2),(T3,P3),(T4,P4)],
....

ご覧のとおり、ほとんどのメッセージは複数のグループに集約されます。現在のアグリゲーターを作成せずに使用してこれを行う方法があるかどうか、誰かが提案できますか。

Spring 統合の集計グループ化も相関キーに基づいているようです。そのため、メッセージは相関キーのグループにマップする必要があります。ただし、現在の API では 1 つの相関キーしか生成できないようです。つまり、各メッセージは 1 つのグループにしか集約できません。これに対する回避策はありますか。

PS

キャメルのソースコードを読んだ後、キャメルは私たちの要件をサポートできないようです。春と一緒に運を試してみてください。指を交差させた ラクダの質問

4

1 に答える 1

1

すぐに使用できるものは何もありませんが、SimpleMessageStore. RollingMessageStore Gistに全文を掲載しました。

removeGroup肝心なのは、グループ全体ではなく、最初のメッセージのみを削除するように変更することです。また、completeGroup何もしないでください。

expreGroupOnCompletionアグリゲーターがグループを強制的に「削除」するように設定します (変更されたremoveGroup()メソッドを呼び出します。

SimpleMessageGroupとの違いは次のとおりRollingMessageGroupです...

182,184c190,194
< 
<               groupUpperBound.release(groupIdToMessageGroup.get(groupId).size());
<               groupIdToMessageGroup.remove(groupId);
---
>               Message<?> message = this.groupIdToMessageGroup.get(groupId).getOne();
>               if (message != null) {
>                   this.groupUpperBound.release(1);
>                   this.removeMessageFromGroup(groupId, message);
>               }

(さらに、 内のすべてのコードを削除しますcompleteGroup()

そしてテストケース...

@Test
public void testRolling() {
    AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new MultiplyingProcessor(), new RollingMessageStore());
    aggregator.setExpireGroupsUponCompletion(true);
    aggregator.setReleaseStrategy(new ReleaseStrategy() {

        @Override
        public boolean canRelease(MessageGroup group) {
            return group.size() == 3;
        }
    });
    QueueChannel replyChannel = new QueueChannel();
    Message<?> message1 = createMessage(3, "ABC", 3, 1, replyChannel, null);
    Message<?> message2 = createMessage(5, "ABC", 3, 2, replyChannel, null);
    Message<?> message3 = createMessage(7, "ABC", 3, 3, replyChannel, null);
    Message<?> message4 = createMessage(9, "ABC", 3, 3, replyChannel, null);
    Message<?> message5 = createMessage(11, "ABC", 3, 3, replyChannel, null);

    aggregator.handleMessage(message1);
    aggregator.handleMessage(message2);
    aggregator.handleMessage(message3);
    aggregator.handleMessage(message4);
    aggregator.handleMessage(message5);

    Message<?> reply = replyChannel.receive(10000);
    assertNotNull(reply);
    assertEquals(reply.getPayload(), 105);
    reply = replyChannel.receive(10000);
    assertNotNull(reply);
    assertEquals(reply.getPayload(), 315);
    reply = replyChannel.receive(10000);
    assertNotNull(reply);
    assertEquals(reply.getPayload(), 693);
}

先に進んでJIRA の新機能の問題を開いてください。今後の 3.0 リリースにこれ (またはより一般的な解決策) を追加することを検討します。

使用correlation-strategy-expression="'foo'"して

release-strategy-expression=size()==3.

于 2013-11-14T15:34:46.243 に答える