ある時点でほぼ同時に 1000 のイベントを発生させるアプリケーションがあります。私がやりたいことは、イベントを 50 アイテムのチャンクにバッチ処理し、10 秒ごとに処理を開始することです。新しいバッチ処理を開始する前に、バッチが完了するのを待つ必要はありません。
例えば:
10:00:00: 10000 new events received
10:00:00: StartProcessing (events.Take(50))
10:00:10: StartProcessing (events.Skip(50).Take(50))
10:00:15: StartProcessing (events.Skip(100).Take(50))
これを達成する方法はありますか?Reactive Extensions が進むべき道だと思いますが、他のソリューションも受け入れられます。
私はここから始めようとしました:
var bufferedItems = eventAsObservable
.Buffer(15)
.Delay(TimeSpan.FromSeconds(5)
しかし、遅延が期待どおりに機能せず、代わりにすべてのバッチが同時に開始されたことに気付きましたが、5 秒遅れました。
Window メソッドもテストしましたが、動作に違いはありませんでした。Window の TimeSpan は、実際には「次の 10 秒間に発生するすべてのイベントを取得する:
var bufferedItems = eventAsObservable
.Window(TimeSpan.FromSeconds(10), 5)
.SelectMany(x => x)
.Subscribe(DoProcessing);
Rx-Main 2.0.20304-beta を使用しています。