4
 public interface Event
 {
      Guid identifier;
      Timestamp ts;
 }

私の金融会社の問題を書き直すために Reactive Extensions を使用することを考えています。

前提は、Guid (株式シンボル + 埋め込まれた一意性エントロピー)、タイムスタンプ、および Value フィールドによって識別されるイベントを取得することです。これらは高い頻度で発生し、「少なくとも」X 秒 (10 秒) 後までこれらのオブジェクトに対処することはできません。その後、それらに対処し、システムから削除する必要があります。

「10 秒」の初期ウィンドウ (T0 から T10 など) の 2 つのウィンドウのように考えてください。ここですべての一意のイベント (基本的には GUID ごとにグループ化) を識別し、次の「10 秒」を調べます。セカンダリ ウィンドウ" (T10-T20)、"少なくとも" 10 秒のポリシーを実装していることを確認します。「最初のウィンドウ」からすべてのイベントを削除し (考慮済みであるため)、「2 次ウィンドウ」から「最初のウィンドウ」で発生したイベントを削除します。そして、10 秒のスライディング ウィンドウを移動し続けるので、今度はウィンドウ T20-T30 を見て、繰り返してすすぎます。

これを Rx に実装するにはどうすればよいでしょうか。

4

1 に答える 1

5

メッセージのサーバー クロックとタイムスタンプを信頼できる場合 (つまり、「実生活」モードにある場合)、ジャンプする 10 秒のウィンドウではなく、スライドする 10 秒の遅延の後である場合は、イベントを10秒遅らせることができます:

var events = new Subject<Event>();  
var delayedEvents = events.Delay(TimeSpan.FromSeconds(10));

一意のイベントなどを確認するには、それらを何らかのセットに追加するだけです:

var guidSet = new HashSet<Guid>();  
delayedEvents.Do(e => guidSet.Add(e.identifier));

代わりに、10 秒待ってから最後の 10 秒を一度に処理する必要があるという問題がある場合は、代わりに 10 秒間バッファリングするだけです。

var bufferedEvents = events.Buffer(TimeSpan.FromSeconds(10));
bufferedEvents.Do(es => { foreach (var e in es) guidSet.Add(e.identifier); });

10 秒のスライド ウィンドウの例は示していません。


今、私たちは真剣になります。ウォール タイムに依存するのではなく、イベント内の時間を使用してロジックを駆動したいとします。イベントが次のように再定義されていると仮定します。

 public class Event
 {
      public Guid identifier;
      public DateTime ts;
 }

履歴スケジューラを作成し、スケジュールされたイベントを元のイベントからフィードします。

var scheduler = new HistoricalScheduler();
var driveSchedule = events.Subscribe(e => scheduler.AdvanceTo(e.ts));   
var target = events.SelectMany(e => Observable.Timer(e.ts, scheduler).Select(_ => e));

これで、通常の Rx コンビネータをtargetの代わりに使用しeventて、スケジューラを通過させるだけで、適切にトリガーされるようになります。次に例を示します。

var bufferedEvents = target.Buffer(TimeSpan.FromSeconds(10), scheduler);

ここに簡単なテストがあります。「実質的に」30 秒間隔で 100 個のイベントを作成しますが、毎秒リアルタイムでトリガーされます。

var now = DateTime.Now;
var test = Enumerable.Range(0,99).Select(i =>
    Scheduler.ThreadPool.Schedule(
        TimeSpan.FromSeconds(i), 
        () => events.OnNext(new Event() { 
            identifier = Guid.NewGuid(), 
            ts = now.AddSeconds(i * 30) 
        })
    )
).ToList();

それをサブスクライブし、60 秒間のバッファーされたイベントを要求します。実際には、2 '実際の' 秒 (60 仮想秒) ごとに 2 つのイベントを受け取ります。

target.Select(e => String.Format("{0} {1}", e.identifier, e.ts.ToString()))
      .Buffer(TimeSpan.FromSeconds(60), scheduler)
      .Select(es => String.Join(" - ", es))
      .DumpLive();
于 2012-05-04T21:49:26.933 に答える