12

ある時点でほぼ同時に 1000 のイベントを発生させるアプリケーションがあります。私がやりたいことは、イベントを 50 アイテムのチャンクにバッチ処理し、10 秒ごとに処理を開始することです。新しいバッチ処理を開始する前に、バッチが完了するのを待つ必要はありません。

例えば:

10:00:00: 10000 new events received
10:00:00: StartProcessing (events.Take(50))
10:00:10: StartProcessing (events.Skip(50).Take(50))
10:00:15: StartProcessing (events.Skip(100).Take(50))

これを達成する方法はありますか?Reactive Extensions が進むべき道だと思いますが、他のソリューションも受け入れられます。

私はここから始めようとしました:

        var bufferedItems = eventAsObservable
            .Buffer(15)
            .Delay(TimeSpan.FromSeconds(5)

しかし、遅延が期待どおりに機能せず、代わりにすべてのバッチが同時に開始されたことに気付きましたが、5 秒遅れました。

Window メソッドもテストしましたが、動作に違いはありませんでした。Window の TimeSpan は、実際には「次の 10 秒間に発生するすべてのイベントを取得する:

        var bufferedItems = eventAsObservable
            .Window(TimeSpan.FromSeconds(10), 5)
            .SelectMany(x => x)
            .Subscribe(DoProcessing);

Rx-Main 2.0.20304-beta を使用しています。

4

4 に答える 4

23

スレッドをスリープさせたくない場合は、次のようにします。

var tick = Observable.Interval(TimeSpan.FromSeconds(5));

eventAsObservable
.Buffer(50)
.Zip(tick, (res, _) => res)
.Subscribe(DoProcessing);
于 2012-06-11T13:26:24.487 に答える
2

これだけのための特定の Buffer メソッドのオーバーロードがあります: https://msdn.microsoft.com/en-us/library/hh229200(v=vs.103).aspx

observable.Buffer(TimeSpan.FromSeconds(5), 50);
于 2016-04-08T12:41:27.540 に答える
-1

これを試して:

    var bufferedItems = eventAsObservable
        .Buffer(50)
        .Do(_ => { Thread.Sleep(10000); });
于 2012-06-07T09:38:17.960 に答える