0

Rx (リアクティブ フレームワーク) を使用してバスを実装していますが、これまでのところ良さそうです。私が今直面している問題は、ストリームの開始前にイベントを追加する方法です。

より多くのコンテキストを提供するために、これはイベント ソース (CQRS/ES 風) 用です。バスが開始すると、多数のサブスクライバーがIObservableを取得します。この時点で、バスはサブスクライバーに開始する必要があるイベント番号/シーケンスを尋ねます。イベントはディスクから最小番号からロードされ、ストリームに追加されます。各サブスクライバーは、where ステートメントを使用して適切なイベントから開始します。アプリケーションが実行されると、新しいイベントが追加され、サブスクライバーに送られます。この部分はうまく機能します。

アタッチが遅くても、すべてのイベントが必要な一部のサブスクライバー。イベントの数が十分に少ない限り、ReplaySubjectは法案に適合しているように見えます。私はおそらくディスク/オフライン キャッシングを実行して、それらをより多く維持できると考えています (任意のポインターを歓迎します!)。

より大きな問題は、一部のサブスクライバーが IObservable を取得するときに、最初に読み込まれたイベントよりも前に発生したイベントを取得する必要があることです。以下のような状況。

バスが開始すると、イベント #50 以降がロードされました (最も早い時期)。ここで、新しいサブスクライバーが IObservable を要求しますが、#20 以降は必要です。だから私がやりたいのは、20-49 をロードして、ストリームの開始前に追加することです。既存のサブスクライバーにはイベントが表示されません (Where によってフィルター処理されます)。

これは可能であるように思われますが、私はそれを理解することができません。これはRxで可能ですか?もしそうなら、どのように?

ありがとう!エリック

4

2 に答える 2

3

私はこれを非常に単純な Rx の方法で見ていきます。を使用しないでください。すべての「ライブ」イベントにReplaySubject1 つSubjectの を使用し、サブスクライバーごとにこれの前に履歴イベントを連結します。

履歴記録を追跡する何らかのデータ構造があると思います。#50 からイベントをロード/キャプチャして #80 を既にメモリに格納している場合、#20 からのイベントを要求する新しいサブスクライバーが来たら、次のようにします。

var fromIndex = 20;
var eventSubject = new Subject<Event>();
capturedEvents
    .GetFromIndex(fromIndex) // Gets a enumerable list of events
    .ToObservable()
    .Concat(eventSubject)
    .Subscribe( ... );

次に、次のeventSubjectように新しいイベントをキャプチャするために使用できます。

eventSubject.Subscribe(capturedEvents.CaptureEvent);

これはあなたのニーズを満たしていますか?

于 2012-05-12T07:19:32.490 に答える
2

ちょうどどうですか:

itemsFromTwentyToFourtyNine.Concat(currentItems);
于 2012-05-12T06:00:41.467 に答える