1

ストリーム B が発生するたびに、ストリーム A をちょうど 1 つの通知で停止したいと考えています。どちらのストリームもオンラインのままで、完了することはありません。

A: o--o--o--o--o--o--o--o--o  
B: --o-----o--------o-------  
R: o-----o-----o--o-----o--o  

また

A: o--o--o--o--o--o--o--o--o  
B: -oo----oo-------oo-------  
R: o-----o-----o--o-----o--o  
4

2 に答える 2

2

このソリューションは、オブザーバブルがホットな場合 (および なしrefCount) に機能します。

streamA
    .takeUntil(streamB)
    .skip(1)
    .repeat()
    .merge(streamA.take(1))
    .subscribe(console.log);
  1. .takeUntil(streamB): ストリームが値を生成Aしたら、ストリームを完了にしBます。
  2. .skip(1)A:開始時に (または の結果として)ストリームが 1 つの値をスキップするようにします.repeat()
  3. .repeat(): ストリームAを無限に繰り返す (再接続する) ようにします。
  4. .merge(streamA.take(1)).skip(1):ストリームの先頭での効果を相殺します。

A ストリームを 5 秒ごとにスキップさせる例:

var streamA,
    streamB;

streamA = Rx.Observable
    .interval(1000)
    .map(function (x) {
        return 'A:' + x;
}).publish();

streamB = Rx.Observable
    .interval(5000);

streamA
    .takeUntil(streamB)
    .skip(1)
    .repeat()
    .merge(streamA.take(1))
    .subscribe(console.log);

streamA.connect();

このサンドボックスhttp://jsbin.com/gijorid/4/edit?js,consoleを使用BACTION()して、コードの実行時にコンソール ログで実行し、値を手動でプッシュすることもできstreamBます (これはコードの分析に役立ちます)。 )。

于 2011-05-08T14:15:12.970 に答える
2

これは、同様の質問に対して私が行ったバージョンのSkipWhenオペレーターです(違いは、元の複数の「B」が複数の「A」をスキップすることです)。

public static IObservable<TSource> SkipWhen<TSource, TOther>(this IObservable<TSource> source, 
    IObservable<TOther> other)
{
    return Observable.Create<TSource>(observer =>
    {
        object lockObject = new object();
        bool shouldSkip = false;

        var otherSubscription = new MutableDisposable();
        var sourceSubscription = new MutableDisposable();

        otherSubscription.Disposable = other.Subscribe(
            x => { lock(lockObject) { shouldSkip = true; } });

        sourceSubscription.Disposable = source.Where(_ =>
        {
            lock(lockObject)
            {
                if (shouldSkip)
                {
                    shouldSkip = false;
                    return false;
                }
                else
                {
                    return true;
                }
            }
        }).Subscribe(observer);

        return new CompositeDisposable(
            sourceSubscription, otherSubscription);
    });
}

現在の実装がボトルネックになる場合は、ロックの実装を変更して を使用することを検討してReaderWriterLockSlimください。

于 2011-05-08T21:15:28.270 に答える