0

私たちの製品にインストールされている Rx ライブラリを受け入れるように上司を説得することはできません。そのため、勉強やコーディングに便利なため、公式の Rx で提供されているようないくつかの単純な関数を実装することを好みます。ここでは、まさに今必要なものを 2 つ選びます ( IObservable、IObserver、ISubject などの基本的な概念を既に実装しています):

Where<TSource>(IObservable<TSource>, Func<TSource, Boolean>)
Timeout<TSource>(IObservable<TSource>, DateTimeOffset)

「Where」をどのように実装するかを考えるのに少し時間がかかりますが、ILSpy コードを読んでも何のアイデアも得られませんでした。結局、IEnumerable とは異なります。

4

3 に答える 3

2

最後に、TempObservable<T>非常に単純で、サンプル コードは以下のようなものを介して実装する方法についていくつかのアイデアを得ました。

private class TempObservable<T> : ISubject<T>
    {
        private List<IObserver<T>> observers = new List<IObserver<T>>();
        //private IObserver<T> observer;

        #region IObservable<T> Members

        public IDisposable Subscribe(IObserver<T> observer)
        {
            this.observers.Add(observer);
            return new UnSubscribe<T>(this.observers, observer);
        }

        #endregion


        #region IObserver<T> Members

        public void OnCompleted()
        {
            throw new NotImplementedException();
        }

        public void OnError(Exception e)
        {
            this.observers.ForEach((ob) => ob.OnError(e));
        }

        public void OnNext(T value)
        {
            this.observers.ForEach((ob) => ob.OnNext(value));
        }

        #endregion
    }

次に、次のWhere<T>ように独自に記述できます。

public static IObservable<T> Where<T>(this IObservable<T> source, Func<T, bool> predict)
    {
        ISubject<T> filteredObservable = new TempObservable<T>();
        source.Subscribe(s => { if (predict(s)) { filteredObservable.OnNext(s); } }, () => { filteredObservable.OnCompleted(); }, ex => { filteredObservable.OnError(ex); });
        return filteredObservable;
    }

他の便利な拡張機能についても同様です。

于 2013-08-05T07:14:55.437 に答える
0

.NET 3.5 をサポートするリアクティブ拡張機能 1.x を使用しないのはなぜですか? これが .NET 3.5 SP1 をサポートする最後のリリースだと思います: http://www.microsoft.com/en-us/download/details.aspx?id=28018

rx のソース コードは、 http://rx.codeplex.comでも入手できます。

于 2013-06-06T08:39:10.617 に答える
-2

これは、による回答の実用的なバージョンです@Shawn(彼の回答にはUnSubscribe<T>方法がありません)。@Shawn素晴らしい作品をありがとう!

を理解するために、これは可能な限り単純な実用的な実装であることに注意してくださいSubject<T>。スレッドセーフではありません。

/// <summary>
///     Simplest possible implementation of Subject(T). See http://bit.ly/1QUdpq1.
/// </summary>
/// <typeparam name="T"></typeparam>
public class TempObservable<T> : ISubject<T>, IDisposable
{
    private readonly List<IObserver<T>> observers = new List<IObserver<T>>();

    #region IObservable<T> Members  
    public IDisposable Subscribe(IObserver<T> observer)
    {
        this.observers.Add(observer);

        // Could return ".this", but this would introduce an issue: if one subscriber unsubscribed, it would
        // unsubscribe all subscribers.
        return new Unsubscribe<T>(this.observers, observer);
    }
    #endregion


    #region IObserver<T> Members
    public void OnNext(T value)
    {
        this.observers.ForEach((ob) => ob.OnNext(value));
    }
    public void OnError(Exception e)
    {
        this.observers.ForEach((ob) => ob.OnError(e));
    }
    public void OnCompleted()
    {
        this.Dispose();
    }
    #endregion

    public void Dispose()
    {
        observers.Clear();
    }
}

public class Unsubscribe<T> : IDisposable
{
    private readonly IObserver<T> _observer;
    private readonly List<IObserver<T>> _observers;

    public Unsubscribe(List<IObserver<T>> observers, IObserver<T> observer)
    {
        _observer = observer;
        _observers = observers;
    }

    public void Dispose()
    {
        int index = _observers.IndexOf(_observer);
        _observers.RemoveAt(index);
    }
}

[TestFixture]
public static class TempObservable_Test
{
    [Test]
    public static void UnitTest()
    {

        TempObservable<int> x = new TempObservable<int>();
        x.Subscribe(o =>
        {
            Console.Write("Test 1: {0}\n", o);
        });
        var toDispose = x.Subscribe(o =>
        {
            Console.Write("Test 2: {0}\n", o);
        });

        x.OnNext(1);
        toDispose.Dispose();
        x.OnNext(2);
        x.Dispose();
        x.OnNext(3);

        x.Where(o => o == 5).Subscribe(o =>
        {
            Console.Write("Tested: {0}\n", o);
        });
        x.OnNext(4);
        x.OnNext(5);
    }
}
于 2015-12-26T17:41:04.427 に答える