最後に、TempObservable<T>
非常に単純で、サンプル コードは以下のようなものを介して実装する方法についていくつかのアイデアを得ました。
private class TempObservable<T> : ISubject<T>
{
private List<IObserver<T>> observers = new List<IObserver<T>>();
//private IObserver<T> observer;
#region IObservable<T> Members
public IDisposable Subscribe(IObserver<T> observer)
{
this.observers.Add(observer);
return new UnSubscribe<T>(this.observers, observer);
}
#endregion
#region IObserver<T> Members
public void OnCompleted()
{
throw new NotImplementedException();
}
public void OnError(Exception e)
{
this.observers.ForEach((ob) => ob.OnError(e));
}
public void OnNext(T value)
{
this.observers.ForEach((ob) => ob.OnNext(value));
}
#endregion
}
次に、次のWhere<T>
ように独自に記述できます。
public static IObservable<T> Where<T>(this IObservable<T> source, Func<T, bool> predict)
{
ISubject<T> filteredObservable = new TempObservable<T>();
source.Subscribe(s => { if (predict(s)) { filteredObservable.OnNext(s); } }, () => { filteredObservable.OnCompleted(); }, ex => { filteredObservable.OnError(ex); });
return filteredObservable;
}
他の便利な拡張機能についても同様です。