以下のコード
- ソース ストリームをパーティションに分割し、
- パーティションごとに 1 つの「レール」である ParallelFlux を作成し、
- 「レール」を別々のスレッドにスケジュールし、
- 結果を収集します
各パーティションに専用のスレッドがあると、その値が元の順序で処理されることが保証されます。
@Test
public void partitioning() throws InterruptedException {
final int N = 10;
Flux<Integer> source = Flux.range(1, 10000).share();
// partition source into publishers
Publisher<Integer>[] publishers = new Publisher[N];
for (int i = 0; i < N; i++) {
final int idx = i;
publishers[idx] = source.filter(v -> v % N == idx);
}
// create ParallelFlux each 'rail' containing single partition
ParallelFlux.from(publishers)
// schedule partitions into different threads
.runOn(Schedulers.newParallel("proc", N))
// process each partition in its own thread, i.e. in order
.map(it -> {
String threadName = Thread.currentThread().getName();
Assert.assertEquals("proc-" + (it % 10 + 1), threadName);
return it;
})
// collect results on single 'rail'
.sequential()
// and on single thread called 'subscriber-1'
.publishOn(Schedulers.newSingle("subscriber"))
.subscribe(it -> {
String threadName = Thread.currentThread().getName();
Assert.assertEquals("subscriber-1", threadName);
});
Thread.sleep(1000);
}