1

次の問題があります: partitionId プロパティ (たとえば 0-10) を持つイベントのリストが与えられた場合、同じ partitionId を持つイベントが受信順に処理されるように、着信イベントを paritionId に従って分割したいと考えています。 . ほぼ均等に分散すると、(パーティションごとに) 10 個のイベントが並行して処理されることになります。

10 個のシングルスレッド ディスパッチャを作成し、適切なディスパッチャにイベントを送信する以外に、Project Reactor を使用して上記を達成する方法はありますか?

ありがとう。

4

1 に答える 1

0

以下のコード

  • ソース ストリームをパーティションに分割し、
  • パーティションごとに 1 つの「レール」である ParallelFlux を作成し、
  • 「レール」を別々のスレッドにスケジュールし、
  • 結果を収集します

各パーティションに専用のスレッドがあると、その値が元の順序で処理されることが保証されます。

@Test
public void partitioning() throws InterruptedException {
    final int N = 10;
    Flux<Integer> source = Flux.range(1, 10000).share();
    // partition source into publishers
    Publisher<Integer>[] publishers = new Publisher[N];
    for (int i = 0; i < N; i++) {
        final int idx = i;
        publishers[idx] = source.filter(v -> v % N == idx);
    }
    // create ParallelFlux each 'rail' containing single partition
    ParallelFlux.from(publishers)
            // schedule partitions into different threads
            .runOn(Schedulers.newParallel("proc", N))
            // process each partition in its own thread, i.e. in order
            .map(it -> {
                String threadName = Thread.currentThread().getName();
                Assert.assertEquals("proc-" + (it % 10 + 1), threadName);
                return it;
            })
            // collect results on single 'rail'
            .sequential()
            // and on single thread called 'subscriber-1'
            .publishOn(Schedulers.newSingle("subscriber"))
            .subscribe(it -> {
                String threadName = Thread.currentThread().getName();
                Assert.assertEquals("subscriber-1", threadName);
            });
    Thread.sleep(1000);
}
于 2017-01-09T16:29:41.553 に答える