7

rx で Observable.Buffer を使用する方法の例を探していましたが、ボイラー プレートの時間バッファリングされたものよりも実質的なものを見つけることができません。

「bufferClosingSelector」を指定するためのオーバーロードがあるようですが、私はそれを気にすることはできません。

私がやろうとしているのは、時間または「累積」によってバッファリングするシーケンスを作成することです。すべてのリクエストに何らかの重みがあり、一度に x 累積された重みを超えて処理したくないリクエスト ストリームを考えてみます。または、十分な量が蓄積されていない場合は、最後の時間枠 (通常のバッファ機能)

4

1 に答える 1

15

bufferClosingSelectorバッファが閉じられると予想されるときに値を生成する Observable を取得するために毎回呼び出される関数です。

例えば、

source.Buffer(() => Observable.Timer(TimeSpan.FromSeconds(1)))通常のBuffer(time)オーバーロードのように機能します。

シーケンスに重みを付けたい場合は、シーケンスに a を適用しScanてから、集計条件を決定できます。

たとえば、source.Scan((a,c) => a + c).SkipWhile(a => a < 100)ソース シーケンスの合計が 100 を超えた場合に値を生成するシーケンスを提供します。

Ambこれら 2 つの終了条件を競合させて、どちらが最初に反応するかを確認できます。

        .Buffer(() => Observable.Amb
                     (
                          Observable.Timer(TimeSpan.FromSeconds(1)), 
                          source.Scan((a,c) => a + c).SkipWhile(a => a < 100)
                     )
               )

その時点で閉じられるバッファの任意の値を生成する一連のコンビネータを使用できます。

注: 終了セレクターに指定された値は重要ではありません。重要なのは通知です。したがって、異なるタイプのソースを組み合わせるには、Amb単純に に変更しSystem.Reactive.Unitます。

Observable.Amb(stream1.Select(_ => new Unit()), stream2.Select(_ => new Unit())
于 2012-03-05T21:34:29.437 に答える