メッセージのサーバー クロックとタイムスタンプを信頼できる場合 (つまり、「実生活」モードにある場合)、ジャンプする 10 秒のウィンドウではなく、スライドする 10 秒の遅延の後である場合は、イベントを10秒遅らせることができます:
var events = new Subject<Event>();
var delayedEvents = events.Delay(TimeSpan.FromSeconds(10));
一意のイベントなどを確認するには、それらを何らかのセットに追加するだけです:
var guidSet = new HashSet<Guid>();
delayedEvents.Do(e => guidSet.Add(e.identifier));
代わりに、10 秒待ってから最後の 10 秒を一度に処理する必要があるという問題がある場合は、代わりに 10 秒間バッファリングするだけです。
var bufferedEvents = events.Buffer(TimeSpan.FromSeconds(10));
bufferedEvents.Do(es => { foreach (var e in es) guidSet.Add(e.identifier); });
10 秒のスライド ウィンドウの例は示していません。
今、私たちは真剣になります。ウォール タイムに依存するのではなく、イベント内の時間を使用してロジックを駆動したいとします。イベントが次のように再定義されていると仮定します。
public class Event
{
public Guid identifier;
public DateTime ts;
}
履歴スケジューラを作成し、スケジュールされたイベントを元のイベントからフィードします。
var scheduler = new HistoricalScheduler();
var driveSchedule = events.Subscribe(e => scheduler.AdvanceTo(e.ts));
var target = events.SelectMany(e => Observable.Timer(e.ts, scheduler).Select(_ => e));
これで、通常の Rx コンビネータをtarget
の代わりに使用しevent
て、スケジューラを通過させるだけで、適切にトリガーされるようになります。次に例を示します。
var bufferedEvents = target.Buffer(TimeSpan.FromSeconds(10), scheduler);
ここに簡単なテストがあります。「実質的に」30 秒間隔で 100 個のイベントを作成しますが、毎秒リアルタイムでトリガーされます。
var now = DateTime.Now;
var test = Enumerable.Range(0,99).Select(i =>
Scheduler.ThreadPool.Schedule(
TimeSpan.FromSeconds(i),
() => events.OnNext(new Event() {
identifier = Guid.NewGuid(),
ts = now.AddSeconds(i * 30)
})
)
).ToList();
それをサブスクライブし、60 秒間のバッファーされたイベントを要求します。実際には、2 '実際の' 秒 (60 仮想秒) ごとに 2 つのイベントを受け取ります。
target.Select(e => String.Format("{0} {1}", e.identifier, e.ts.ToString()))
.Buffer(TimeSpan.FromSeconds(60), scheduler)
.Select(es => String.Join(" - ", es))
.DumpLive();