9

私はReactorを学んでいますが、特定の動作を実現する方法を知りたいと思っています。着信メッセージのストリームがあるとしましょう。各メッセージは特定のエンティティに関連付けられており、いくつかのデータが含まれています。

interface Message {
    String getEntityId();
    Data getData();
}

異なるエンティティに関連するメッセージを並行して処理できます。ただし、単一のエンティティに関連するメッセージは一度に 1 つずつ処理する必要があります。つまり、エンティティのメッセージ 2 の処理は、"abc"エンティティのメッセージ 1 の処理が終了するまで開始できません"abc"。メッセージの処理が進行している間、そのエンティティの後続のメッセージをバッファリングする必要があります。他の実体へのメッセージは妨げられずに進むことができます。次のようなコードを実行しているエンティティごとにスレッドがあると考えることができます。

public void run() {
    for (;;) {
        // Blocks until there's a message available
        Message msg = messageQueue.nextMessageFor(this.entityId);

        // Blocks until processing is finished
        processMessage(msg);
    }
}

ブロックせずにReactでこれを達成するにはどうすればよいですか? 合計メッセージ レートは高くなる可能性がありますが、エンティティごとのメッセージ レートは非常に低くなります。エンティティのセットは非常に大きくなる可能性があり、必ずしも事前にわかっているとは限りません。

おそらくこんな感じだと思いますが、わかりません。

{
    incomingMessages()
            .groupBy(Message::getEntityId)
            .flatMap(entityStream -> entityStream
                    /* ... */
                    .map(msg -> /* process the message */)))
                    /* ... */
}

public static Stream<Message> incomingMessages() { /* ... */ }
4

2 に答える 2