5

Observable.Buffer をバッファ時間の終了前に強制的にフラッシュする方法はありますか?

例では:

mSubscription = mFluxObservable.Buffer(new TimeSpan(0, 0, 1, 30)).Subscribe(o => saver(o, iSessionId));

1:30 ピリオドが終了する前にデータをフラッシュしたい!

4

2 に答える 2

9

これは私のために働いた:

var subject = new Subject<Unit>();
var closing = Observable
    .Timer(new TimeSpan(0, 0, 1, 30))
    .Select(x => Unit.Default);

var query =
    mFluxObservable
        .Buffer(() => Observable
            .Amb(subject, closing)
            .Take(1));

今、私subject.OnNext(Unit.Default)はバッファを強制的にフラッシュするために呼び出す必要があるだけです。フラッシュの直後に新しいバッファが開始されます。

于 2012-10-17T23:21:59.427 に答える
3

...そして基本的にウィンドウを使用した同じプリンシパル:

var bufferPeriod = TimeSpan.FromSeconds(1.5);
var source = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(50);

//source.Buffer(bufferPeriod).Dump();

var bufferFlush = new Subject<long>();//Or Subject<Unit>
source.Window(
        ()=>Observable.Merge(Observable.Timer(bufferPeriod), bufferFlush))
    .Select(window=>window.ToList())
    .Dump();

//Simulate calling flush.
Observable.Interval(TimeSpan.FromMilliseconds(1350)).Take(2).Subscribe(bufferFlush);
于 2012-10-18T10:39:15.183 に答える