6

RxJava2最近、背圧がどのように機能するかを理解していないことに気付きました。

私は小さなテストを作成しましたが、MissingBackpressureException例外で失敗するはずです:

@Test
public void testBackpressureWillFail() {
    Observable.<Integer>create(e -> {
        for (int i = 0; i < 10000; i++) {
            System.out.println("Emit: " + i);
            e.onNext(i);
        }
        e.onComplete();
    })
    .subscribeOn(Schedulers.newThread())
    .observeOn(Schedulers.computation())
    .doOnNext(i -> {
        Thread.sleep(100);
        System.out.println("Processed:" + i);
    })
    .blockingSubscribe();
}

システムアウトは次のように表示されます:

Emit: 0
Emit: 1
Emit: 2
...
Emit: 10000

Processed:0
Processed:1
Processed:2
...
Processed:10000

生成しない理由MissingBackpressureException

サイズがより大きい場合、e.onNext(i);アイテムはバッファに入れられると思いますObservableObserveOnstatic final int BUFFER_SIZE = Math.max(16,Integer.getInteger("rx2.buffer-size",128).intValue());

MissingBackpressureException発生しないものをスローする必要があります。バッファーは自動的に拡張されますか? そうでない場合、アイテムはどこに保管されていますか?

4

1 に答える 1