Observable.Buffer をバッファ時間の終了前に強制的にフラッシュする方法はありますか?
例では:
mSubscription = mFluxObservable.Buffer(new TimeSpan(0, 0, 1, 30)).Subscribe(o => saver(o, iSessionId));
1:30 ピリオドが終了する前にデータをフラッシュしたい!
Observable.Buffer をバッファ時間の終了前に強制的にフラッシュする方法はありますか?
例では:
mSubscription = mFluxObservable.Buffer(new TimeSpan(0, 0, 1, 30)).Subscribe(o => saver(o, iSessionId));
1:30 ピリオドが終了する前にデータをフラッシュしたい!
これは私のために働いた:
var subject = new Subject<Unit>();
var closing = Observable
.Timer(new TimeSpan(0, 0, 1, 30))
.Select(x => Unit.Default);
var query =
mFluxObservable
.Buffer(() => Observable
.Amb(subject, closing)
.Take(1));
今、私subject.OnNext(Unit.Default)
はバッファを強制的にフラッシュするために呼び出す必要があるだけです。フラッシュの直後に新しいバッファが開始されます。
...そして基本的にウィンドウを使用した同じプリンシパル:
var bufferPeriod = TimeSpan.FromSeconds(1.5);
var source = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(50);
//source.Buffer(bufferPeriod).Dump();
var bufferFlush = new Subject<long>();//Or Subject<Unit>
source.Window(
()=>Observable.Merge(Observable.Timer(bufferPeriod), bufferFlush))
.Select(window=>window.ToList())
.Dump();
//Simulate calling flush.
Observable.Interval(TimeSpan.FromMilliseconds(1350)).Take(2).Subscribe(bufferFlush);