19

急速なバーストでイベントを生成するObservableシーケンスがあります(つまり、次々に5つのイベント、次に長い遅延、次に別の高速バーストのイベントなど)。イベント間に短い遅延を挿入して、これらのバーストをスムーズにします。例として次の図を想像してみてください。

生:-oooo -------------- ooooo ----- oo ---------------- ooo |
バッファリング:-o--o--o--o -------- o--o--o--o--o--o--o --------- o--o--o |

Observable.Interval()私の現在のアプローチは、生のストリームから別のイベントをプルしても問題がないときに、その信号を介してメトロノームのようなタイマーを生成することです。問題は、そのタイマーをバッファリングされていない生の監視可能なシーケンスと組み合わせる方法がわからないことです。

IObservable.Zip()は私がやりたいことを実行するのに近いですが、生のストリームがタイマーよりも速くイベントを生成している場合にのみ機能します。生のストリームに大きな落ち込みがあるとすぐに、タイマーは一連の不要なイベントを構築し、すぐに生のストリームからのイベントの次のバーストとペアになります。

理想的には、上記で概説した動作を生成する次の関数シグネチャを持つIObservable拡張メソッドが必要です。今、私の救助に来てくださいStackOverflow :)

public static IObservable<T> Buffered(this IObservable<T> src, TimeSpan minDelay)

PS。私はRxを初めて使用するので、これが簡単な質問である場合はお詫びします...


1.シンプルでありながら欠陥のあるアプローチ

これが私の最初の素朴で単純な解決策であり、かなりの数の問題があります。

public static IObservable<T> Buffered<T>(this IObservable<T> source, TimeSpan minDelay)
{
    Queue<T> q = new Queue<T>();
    source.Subscribe(x => q.Enqueue(x));
    return Observable.Interval(minDelay).Where(_ => q.Count > 0).Select(_ => q.Dequeue());
}

これに関する最初の明らかな問題は、内部サブスクリプションによってrawソースに返されたIDisposableが失われるため、サブスクリプションを終了できないことです。このメソッドによって返されたIDisposableでDisposeを呼び出すと、タイマーは強制終了されますが、キューからイベントをプルするために誰も残っていない状態でキューを不必要に埋めている基になる生のイベントフィードは強制終了されません。

2番目の問題は、例外またはストリーム終了通知がrawイベントストリームからバッファリングされたストリームに伝播される方法がないことです。これらは、rawソースにサブスクライブするときに単に無視されます。

そして最後に大事なことを言い忘れましたが、実際にやるべきことがあるかどうかに関係なく定期的にウェイクアップするコードがあります。これは、この素晴らしい新しいリアクティブな世界では避けたいと思います。


2.非常に複雑なアプローチ

最初の単純なアプローチで発生した問題を解決するために、次のように動作するはるかに複雑な関数を作成しましたIObservable.Delay()(.NET Reflectorを使用してそのコードを読み取り、関数の基礎として使用しました)。AnonymousObservable残念ながら、 system.reactiveコードの外部では公開されていないなど、定型的なロジックの多くは、多くのコードをコピーして貼り付ける必要がありました。このソリューションは機能しているように見えますが、その複雑さを考えると、バグがないという自信はありません。

標準のReactive拡張機能のいくつかの組み合わせを使用してこれを達成する方法がないことを信じることができません。不必要に車輪の再発明をしているような気がするのは嫌いで、作成しようとしているパターンはかなり標準的なもののようです。

4

1 に答える 1

10

これは実際には、バッファリングされたイベントを等間隔でプッシュするAの方法の複製ですが、ここに要約を含めます(元の方法は、いくつかの選択肢があるため、かなり混乱しているように見えます)。

public static IObservable<T> Buffered<T>(this IObservable<T> source, TimeSpan minDelay)
{
    return source.Drain(x => 
        Observable.Empty<int>()
            .Delay(minDelay)
            .StartWith(x)
    );
}

Drainの私の実装はSelectMany、前の出力が最初に終了するのを待つことを除いて、のように機能します(これは、と考えることができますがConactManySelectManyより似ていMergeManyます)。ビルトインDrainはこのようには機能しないため、以下の実装を含める必要があります。

public static class ObservableDrainExtensions
{
    public static IObservable<TOut> Drain<TSource, TOut>(
        this IObservable<TSource> source, 
        Func<TSource, IObservable<TOut>> selector)
    {
        return Observable.Defer(() =>
        {
            BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());

            return source
                .Zip(queue, (v, q) => v)
                .SelectMany(v => selector(v)
                    .Do(_ => { }, () => queue.OnNext(new Unit()))
                );
        });
    }
}
于 2010-12-22T09:46:47.390 に答える