1

特定の条件下で保留中のリクエストを数秒間一時的に保留するために、HTTPリクエストにバックプレッシャー戦略を実装しようとしています。一時停止するロジックは、別の Observable に基づいています。

私の研究と理解により、pausableBufferedオペレーターは私が必要としているものを正確に実行してくれると信じています。ここに文書化されていますhttp://reactivex.io/documentation/operators/backpressure.html

ただし、ReactiveX v5 (5.0.0-beta.0) ではこの演算子を見つけることができず、移行ガイド (v4 - v5) には削除されたことが示されているようです。この場合、v5 の使用可能なオペレーターを使用して、どうすれば目的の結果を得ることができますか?

4

2 に答える 2

3

バックプレッシャーの話は今のところ完全に取り下げられています。

同じ結果を得る 1 つの方法を次に示します。

const pausableBuffered = (observable, pauser) => {
    const subj = new rx.Subject();

    let buffer = [];
    const nextEmitter = x => subj.next(x);
    const nextBuffer = x => buffer.push(x);

    let subscriber = nextEmitter;
    observable.subscribe(x => subscriber(x));

    pauser.subscribe(value => {
        if (value) {
            subscriber = nextBuffer;
        } else {
            buffer.forEach(nextEmitter);
            buffer = [];
            subscriber = nextEmitter;
        }
    })

    return subj;
};
于 2016-01-30T05:35:32.630 に答える
0

私はこの答えに出くわしました。私のユースケースでは、それをパイプに変えました

import { Observable, Subject, Subscription } from "rxjs";

export function pausable(pauseToken: Observable<boolean>, startPuased: boolean, lastOnly: boolean) {
    return function <T>(source: Subject<T>): Observable<T> {
        let buffer: T[] = [];
        const nextEmitter = (x: T) => subj.next(x);
        const nextBuffer = (x: any) => buffer.push(x);

        var sourceSubscription: Subscription;
        var pauseSubscription: Subscription;

        var subj = new Subject<T>();

        let subscriber = nextEmitter;
        if (startPuased) {
            subscriber = nextBuffer;
        }
        sourceSubscription = source.subscribe({
            next(value) {
                subscriber(value);
            },
            error(error) {
                subj.error(error);
            },
            complete() {
                subj.complete();
                pauseSubscription?.unsubscribe();
            }
        })

        pauseSubscription = pauseToken.subscribe({
            next(value) {
                if (value) {
                    subscriber = nextBuffer;
                } else {
                    if (lastOnly && buffer.length > 0) {
                        nextEmitter(buffer.pop())
                    } else {
                        buffer.forEach(nextEmitter);
                    }
                    buffer = [];
                    subscriber = nextEmitter;
                }
            },
            complete() {
                sourceSubscription?.unsubscribe();
                pauseSubscription?.unsubscribe();
            }
        });

        return subj;
    }
}

于 2021-03-10T21:57:36.213 に答える