6

それぞれ値を持つオブジェクトの 2 つのストリームがありTimestampます。両方のストリームが順番に並べられているため、たとえば、タイムスタンプは、一方のストリームでは T a = 、もう1,3,6,6,7一方のストリームでは T b =1,2,5,5,6,8である可能性があります。両方のストリームのオブジェクトは同じ型です。

私ができるようにしたいのは、これらの各イベントをタイムスタンプの順にバスに配置することです。つまり、A 1、次に B 1、B 2、A 3などを配置します。さらに、一部のストリームには同じタイムスタンプを持つ複数の (連続した) 要素があるため、これらの要素をグループ化して、新しい各イベントが配列になるようにします。したがって、[A 3 ] をバスに配置し、その後に [A 1 5 ,A 2 5 ] などを配置します。

これを実装しようとしました。2 つのConcurrentQueue構造を作成し、各イベントをキューの後ろに置き、次にキューの各前を見て、最初に前のイベントを選択してから、このタイムスタンプを持つすべてのイベントが存在するようにキューをトラバースします。 .

ただし、次の 2 つの問題が発生しました。

  • これらのキューを無制限のままにしておくと、イベントを受け取るハンドラーよりも読み取り操作の方がはるかに高速であるため、メモリがすぐに不足します。(数ギガバイトのデータがあります)。
  • A 2 5が到着する前に、A 1 5などのイベントを処理する状況になることがあります。どうにかしてこれを防ぐ必要があります。

私は Rx がこの点で役立つと考えていますが、これを可能にする明白なコンビネータは見当たりません。したがって、アドバイスは大歓迎です。

4

1 に答える 1

10

Rxは確かにこの問題のIMOにぴったりです。

IObservables明らかな理由で「OrderBy」を実行することはできません(正しい出力順序を保証するには、最初にストリーム全体を監視する必要があります)。したがって、以下の私の答えは、2つのソースイベントストリームが正常であると仮定しています。

結局、それは興味深い問題でした。次のグループの最初の要素が観測されたときに観測可能な前のグループをGroupByUntilChanged呼び出している限り、標準のRx演算子にはこれを簡単に解決できるaがありません。OnCompleteただし、その実装を見ると、DistinctUntilChangedこのパターンには従わずOnComplete、ソースのobservableが完了したときにのみ呼び出されます(最初の非個別の要素の後に要素がなくなることがわかっている場合でも...奇妙な???)。とにかく、それらの理由で、私はGroupByUntilChanged(Rx規則を破らないように)方法に反対することに決め、代わりに。を選びましたToEnumerableUntilChanged

免責事項:これは私の最初のRx拡張機能なので、私の選択に関するフィードバックをいただければ幸いです。また、私の主な関心事の1つは、distinctElementsリストを保持している匿名のオブザーバブルです。

まず、アプリケーションコードは非常に単純です。

    public class Event
    {
        public DateTime Timestamp { get; set; }
    }

    private IObservable<Event> eventStream1;
    private IObservable<Event> eventStream2; 

    public IObservable<IEnumerable<Event>> CombineAndGroup()
    {
        return eventStream1.CombineLatest(eventStream2, (e1, e2) => e1.Timestamp < e2.Timestamp ? e1 : e2)
            .ToEnumerableUntilChanged(e => e.Timestamp);
    }

次に、ToEnumerableUntilChanged実装(コードの壁の警告)について説明します。

    public static IObservable<IEnumerable<TSource>> ToEnumerableUntilChanged<TSource,TKey>(this IObservable<TSource> source, Func<TSource,TKey> keySelector)
    {
        // TODO: Follow Rx conventions and create a superset overload that takes the IComparer as a parameter
        var comparer = EqualityComparer<TKey>.Default;

        return Observable.Create<IEnumerable<TSource>>(observer =>
        {
            var currentKey = default(TKey);
            var hasCurrentKey = false;
            var distinctElements = new List<TSource>();

            return source.Subscribe((value =>
            {
                TKey elementKey;
                try
                {
                    elementKey = keySelector(value);
                }
                catch (Exception ex)
                {
                    observer.OnError(ex);
                    return;
                }

                if (!hasCurrentKey)
                {
                    hasCurrentKey = true;
                    currentKey = elementKey;
                    distinctElements.Add(value);
                    return;
                }

                bool keysMatch;
                try
                {
                    keysMatch = comparer.Equals(currentKey, elementKey);
                }
                catch (Exception ex)
                {
                    observer.OnError(ex);
                    return;
                }

                if (keysMatch)
                {
                    distinctElements.Add(value);
                    return;
                }

                observer.OnNext( distinctElements);

                distinctElements.Clear();
                distinctElements.Add(value);
                currentKey = elementKey;

            }), observer.OnError, () =>
            {
                if (distinctElements.Count > 0)
                    observer.OnNext(distinctElements);

                observer.OnCompleted();
            });
        });
    }
于 2012-03-17T01:31:40.783 に答える