7

イベントのストリームを取り込み、別のイベントのストリームをプッシュするクラスがあります。

すべてのイベントで Reactive Extensions (RX) が使用されます。イベントの着信ストリームは外部ソースからIObserver<T>using.OnNextにプッシュされ、イベントの発信ストリームは and を使用してプッシュされIObservable<T>ます.Subscribe。私はSubject<T>舞台裏でこれを管理するために使用しています。

出力を一時的に一時停止するための RX の手法にはどのようなものがあるのだろうかと考えています。これは、着信イベントが内部キューに蓄積され、一時停止が解除されると、イベントが再び流出することを意味します。

4

3 に答える 3

3

これは、必要なことを行うかなり単純な Rx の方法です。Pausableソース オブザーバブルと、オブザーバブルbooleanを一時停止または再開する2 番目のオブザーバブルを取得するという拡張メソッドを作成しました。

public static IObservable<T> Pausable<T>(
    this IObservable<T> source,
    IObservable<bool> pauser)
{
    return Observable.Create<T>(o =>
    {
        var paused = new SerialDisposable();
        var subscription = Observable.Publish(source, ps =>
        {
            var values = new ReplaySubject<T>();
            Func<bool, IObservable<T>> switcher = b =>
            {
                if (b)
                {
                    values.Dispose();
                    values = new ReplaySubject<T>();
                    paused.Disposable = ps.Subscribe(values);
                    return Observable.Empty<T>();
                }
                else
                {
                    return values.Concat(ps);
                }
            };

            return pauser.StartWith(false).DistinctUntilChanged()
                .Select(p => switcher(p))
                .Switch();
        }).Subscribe(o);
        return new CompositeDisposable(subscription, paused);
    });
}

次のように使用できます。

var xs = Observable.Generate(
    0,
    x => x < 100,
    x => x + 1,
    x => x,
    x => TimeSpan.FromSeconds(0.1));

var bs = new Subject<bool>();

var pxs = xs.Pausable(bs);

pxs.Subscribe(x => { /* Do stuff */ });

Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);

さて、あなたの「入ってくるイベントのストリーム」が何を意味するのか、私が完全に理解できなかったのはIObserver<T>「. ストリームはIObservable<T>. オブザーバーはストリームではありません。ここでは何もしていないようです。質問に追加して、さらに説明していただけますか?

于 2015-07-13T04:55:38.630 に答える
1

で一時停止/一時停止解除をシミュレートできますObservable

pauseObservable が「一時停止」値を発行したら、pauseObservable が「一時停止解除」値を発行するまでイベントをバッファリングします。

Dave Sextonによる BufferUntil 実装とTimothy Shields による Observable ロジックを使用する例を次に示します(私自身の質問から)

        //Input events, hot observable
        var source = Observable.Interval(TimeSpan.FromSeconds(1))
            .Select(i => i.ToString())
            .Publish().RefCount();

       //Simulate pausing from Keyboard, not actually relevant within this answer
        var pauseObservable = Observable.FromEventPattern<KeyPressEventHandler, KeyPressEventArgs>(
            k => KeyPressed += k, k => KeyPressed -= k)
            .Select(i => i.EventArgs.PressedKey)
            .Select(i => i == ConsoleKey.Spacebar) //space is pause, others are unpause
            .DistinctUntilChanged();

        //Let events through when not paused
        var notPausedEvents = source.Zip(pauseObservable.MostRecent(false), (value, paused) => new {value, paused})
            .Where(i => !i.paused) //not paused
            .Select(i => i.value)
            .Subscribe(Console.WriteLine);

        //When paused, buffer until not paused
        var pausedEvents = pauseObservable.Where(i => i)
            .Subscribe(_ =>
                source.BufferUntil(pauseObservable.Where(i => !i))
                    .Select(i => String.Join(Environment.NewLine, i))
                    .Subscribe(Console.WriteLine));

改善の余地 : ソースへの 2 つのサブスクリプション (pausedEvents と notPausedEvents) を 1 つにマージする可能性があります。

于 2015-07-12T15:43:53.117 に答える