3

RxJava のバックプレッシャーに関するいくつかのドキュメントを読んでいますが、ライブラリの内部でどのように発生するかなどの詳細な説明を見つけることができません。「プロデューサー」は速すぎ、「コンシューマー」は遅すぎます。

たとえば、以下のコードのように:

Observable.interval(1, TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.newThread())
    .subscribe(
        i -> {
            System.out.println(i);
            try {
                Thread.sleep(100);
            } catch (Exception e) { }
        },
        System.out::println);

私は RxJava ソースコードを調べてきたので、私の理解では、メインスレッドではミリ秒ごとにイベントを発行し、発行したら System.out.println(i) メソッドに値を渡し、それをnewThead スケジューラのスレッド プールを開き、ランナブル内でメソッドを実行します。

だから私の質問は、どのように例外が内部的に発生するのですか? Thread.sleep() を呼び出すと、メソッド呼び出しを処理するスレッドをスリープ状態にするだけです -> System.out.println() スレッド プール内の他のスレッドに影響を与えることなく、例外が発生します。スレッドプールに使用可能なスレッドが十分にないためですか?

ありがとう

4

1 に答える 1

7

バックプレッシャーは、1 人のオペレーターが上流のソースに渡す許可のシステムと考えることができます。128 個の要素を与えることができます。少し後に、このオペレーターは「よし、もう 96 をくれ」と言うかもしれないので、合計で 224 の未処理の許可が存在する可能性があります。intervalパーミットを気にせず、値を定期的に配布するだけのようなソースもあります。パーミットの数は、通常、キューまたはバッファーで使用可能な容量に強く結び付けられているため、これらのストレージが保持できるよりも多くを配布すると、利回りが発生しますMissingBackpressureException

バックプレッシャ違反の検出は、主に、キューがいっぱいであることを示す場合のofferように、バインドされたキューへの が false を返す場合に発生します。observeOn

違反を検出する 2 つ目の方法は、オペレーターの未処理の許可カウントを追跡することです。たとえばonBackpressureDrop、アップストリームがこれを超える数を送信するたびに、オペレーターはそれを転送しません。

// in onBackpressureDrop
public void onNext(T value) {
    if (emitted != availablePermits) {
        emitted++;
        child.onNext(value);
    } else {
        // ignoring this value
    }
}

子サブスクライバーは request() を介してその許可を通知します。これは通常、次のようになりonBackpressureDropます。

public void childRequested(long n) {
    availablePermits += n;
}

実際には、非同期実行の可能性があるため、availablePermitsAtomicLong(および と呼ばれrequestedます) です。

于 2016-08-12T10:19:31.053 に答える