Rxは確かにこの問題のIMOにぴったりです。
IObservables
明らかな理由で「OrderBy」を実行することはできません(正しい出力順序を保証するには、最初にストリーム全体を監視する必要があります)。したがって、以下の私の答えは、2つのソースイベントストリームが正常であると仮定しています。
結局、それは興味深い問題でした。次のグループの最初の要素が観測されたときに観測可能な前のグループをGroupByUntilChanged
呼び出している限り、標準のRx演算子にはこれを簡単に解決できるaがありません。OnComplete
ただし、その実装を見ると、DistinctUntilChanged
このパターンには従わずOnComplete
、ソースのobservableが完了したときにのみ呼び出されます(最初の非個別の要素の後に要素がなくなることがわかっている場合でも...奇妙な???)。とにかく、それらの理由で、私はGroupByUntilChanged
(Rx規則を破らないように)方法に反対することに決め、代わりに。を選びましたToEnumerableUntilChanged
。
免責事項:これは私の最初のRx拡張機能なので、私の選択に関するフィードバックをいただければ幸いです。また、私の主な関心事の1つは、distinctElements
リストを保持している匿名のオブザーバブルです。
まず、アプリケーションコードは非常に単純です。
public class Event
{
public DateTime Timestamp { get; set; }
}
private IObservable<Event> eventStream1;
private IObservable<Event> eventStream2;
public IObservable<IEnumerable<Event>> CombineAndGroup()
{
return eventStream1.CombineLatest(eventStream2, (e1, e2) => e1.Timestamp < e2.Timestamp ? e1 : e2)
.ToEnumerableUntilChanged(e => e.Timestamp);
}
次に、ToEnumerableUntilChanged
実装(コードの壁の警告)について説明します。
public static IObservable<IEnumerable<TSource>> ToEnumerableUntilChanged<TSource,TKey>(this IObservable<TSource> source, Func<TSource,TKey> keySelector)
{
// TODO: Follow Rx conventions and create a superset overload that takes the IComparer as a parameter
var comparer = EqualityComparer<TKey>.Default;
return Observable.Create<IEnumerable<TSource>>(observer =>
{
var currentKey = default(TKey);
var hasCurrentKey = false;
var distinctElements = new List<TSource>();
return source.Subscribe((value =>
{
TKey elementKey;
try
{
elementKey = keySelector(value);
}
catch (Exception ex)
{
observer.OnError(ex);
return;
}
if (!hasCurrentKey)
{
hasCurrentKey = true;
currentKey = elementKey;
distinctElements.Add(value);
return;
}
bool keysMatch;
try
{
keysMatch = comparer.Equals(currentKey, elementKey);
}
catch (Exception ex)
{
observer.OnError(ex);
return;
}
if (keysMatch)
{
distinctElements.Add(value);
return;
}
observer.OnNext( distinctElements);
distinctElements.Clear();
distinctElements.Add(value);
currentKey = elementKey;
}), observer.OnError, () =>
{
if (distinctElements.Count > 0)
observer.OnNext(distinctElements);
observer.OnCompleted();
});
});
}