11

別のスレッドで順序付けされていないイベントを並べ替えようとしています。

これらのマーブル ダイアグラムに一致するリアクティブ拡張クエリを作成することは可能ですか。

s1          1   2       3   4

s2          1   3   2       4

result      1       2   3   4

と...

s1          1   2   3   4

s2          4   3   2   1

result                  1234

つまり、結果のみをバージョン番号順に公開します。

最も近いのは、Join を使用して、s1 がティックするたびにウィンドウを開き、s2 が同じ番号で到着したときにのみウィンドウを閉じることです。

このような:

var publishedEvents = events.Publish().RefCount();
publishedEvents.Join(
        publishedEvents.Scan(0, (i, o) => i + 1),
        expectedVersion => publishedEvents.Any(@event => @event.Version == expectedVersion),
        _ => Observable.Never<Unit>(),
        (@event, expectedVersion) => new {@event,expectedVersion})
    .Where(x => x.expectedVersion == x.@event.Version)
    .Select(x => x.@event)
    .Subscribe(Persist);

しかし、それはダイアグラム 2 では機能しません。グループ 2 は、s2 が数 2 でティックすると、したがって 1 の前に完了します。

それは理にかなっていますか?Rxでできますか?それはすべきですか?

編集: 先行するすべてのウィンドウが閉じられる前に後のウィンドウを閉じることができない、重複するウィンドウのようなものだと思います。また、ウィンドウ番号がイベントのバージョン番号と一致するまで、前のウィンドウは閉じません。

編集2

私は今このようなものを持っていますが、それは実際には私が望んでいた反応的、機能的、スレッドセーフな LINQ 啓示ではありません (私のイベントが今のところ JObjects であることを無視してください):

var orderedEvents = Observable.Create<JObject>(observer =>
{
    var nextVersionExpected = 1;
    var previousEvents = new List<JObject>();
    return events
        .ObserveOn(Scheduler.CurrentThread)
        .Subscribe(@event =>
        {
            previousEvents.Add(@event);

            var version = (long) @event["Version"];
            if (version != nextVersionExpected) return;

            foreach (var previousEvent in previousEvents.OrderBy(x => (long) x["Version"]).ToList())
            {
                if ((long) previousEvent["Version"] != nextVersionExpected)
                    break;

                observer.OnNext(previousEvent);
                previousEvents.Remove(previousEvent);
                nextVersionExpected++;
            }
        });
});
4

1 に答える 1