0

Subject オブジェクトを理解するのに苦労しています。

次のコードを検討してください。

        var sub = new Subject<int>();
        sub.Subscribe(x => Console.WriteLine(x));    //subscriber #1        
        sub.Subscribe(x => Console.WriteLine(x));    //subscriber #2        
        sub.OnNext(2);

int のサブジェクトを作成しています。OnNext を実行すると、他のサブスクライバー (#1 と #2) が呼び出されます。私が得られないことは、サブジェクトが観察可能でオブザーバーの両方であるオブジェクトを意味することを読んだことですが、これは、OnNextを呼び出すと他のサブスクライバーが呼び出される理由をどのように説明しますか.

サブジェクトの OnNext がそれをすべてのサブスクライバーに伝播するかどうかは理解できますが、他のすべての人に公開する (これは理にかなっています)。

以下のコードから、OnNext(2) が他のサブスクリプションに伝播する正確な理由を誰かが理解できるでしょうか? (#1、#2)?

public seal class Subject : ISubject, ISubject, IObserver, IObservable, IDisposable { // Fields private volatile IObserver _observer;

// Methods
public Subject()
{
    this._observer = NopObserver<T>.Instance;
}

public void Dispose()
{
    this._observer = DisposedObserver<T>.Instance;
}

public void OnCompleted()
{
    IObserver<T> comparand = null;
    IObserver<T> completed = DoneObserver<T>.Completed;
    do
    {
        comparand = this._observer;
    }
    while (((comparand != DisposedObserver<T>.Instance) && !(comparand is DoneObserver<T>)) && (Interlocked.CompareExchange<IObserver<T>>(ref this._observer, completed, comparand) != comparand));
    comparand.OnCompleted();
}

public void OnError(Exception error)
{
    if (error == null)
    {
        throw new ArgumentNullException("error");
    }
    IObserver<T> comparand = null;
    DoneObserver<T> observer3 = new DoneObserver<T> {
        Exception = error
    };
    DoneObserver<T> observer2 = observer3;
    do
    {
        comparand = this._observer;
    }
    while (((comparand != DisposedObserver<T>.Instance) && !(comparand is DoneObserver<T>)) && (Interlocked.CompareExchange<IObserver<T>>(ref this._observer, observer2, comparand) != comparand));
    comparand.OnError(error);
}

public void OnNext(T value)
{
    this._observer.OnNext(value);
}

public IDisposable Subscribe(IObserver<T> observer)
{
    if (observer == null)
    {
        throw new ArgumentNullException("observer");
    }
    IObserver<T> comparand = null;
    IObserver<T> observer3 = null;
    do
    {
        comparand = this._observer;
        if (comparand == DisposedObserver<T>.Instance)
        {
            throw new ObjectDisposedException("");
        }
        if (comparand == DoneObserver<T>.Completed)
        {
            observer.OnCompleted();
            return Disposable.Empty;
        }
        DoneObserver<T> observer4 = comparand as DoneObserver<T>;
        if (observer4 != null)
        {
            observer.OnError(observer4.Exception);
            return Disposable.Empty;
        }
        if (comparand == NopObserver<T>.Instance)
        {
            observer3 = observer;
        }
        else
        {
            Observer<T> observer5 = comparand as Observer<T>;
            if (observer5 != null)
            {
                observer3 = observer5.Add(observer);
            }
            else
            {
                observer3 = new Observer<T>(new ImmutableList<IObserver<T>>(new IObserver<T>[] { comparand, observer }));
            }
        }
    }
    while (Interlocked.CompareExchange<IObserver<T>>(ref this._observer, observer3, comparand) != comparand);
    return new Subscription<T>((Subject<T>) this, observer);
}

private void Unsubscribe(IObserver<T> observer)
{
    IObserver<T> comparand = null;
    IObserver<T> instance = null;
Label_0004:
    comparand = this._observer;
    if ((comparand != DisposedObserver<T>.Instance) && !(comparand is DoneObserver<T>))
    {
        Observer<T> observer4 = comparand as Observer<T>;
        if (observer4 != null)
        {
            instance = observer4.Remove(observer);
        }
        else
        {
            if (comparand != observer)
            {
                return;
            }
            instance = NopObserver<T>.Instance;
        }
        if (Interlocked.CompareExchange<IObserver<T>>(ref this._observer, instance, comparand) != comparand)
        {
            goto Label_0004;
        }
    }
}

// Properties
public bool HasObservers
{
    get
    {
        return (((this._observer != NopObserver<T>.Instance) && !(this._observer is DoneObserver<T>)) && (this._observer != DisposedObserver<T>.Instance));
    }
}

// Nested Types
private class Subscription : IDisposable
{
    // Fields
    private IObserver<T> _observer;
    private Subject<T> _subject;

    // Methods
    public Subscription(Subject<T> subject, IObserver<T> observer)
    {
        this._subject = subject;
        this._observer = observer;
    }

    public void Dispose()
    {
        IObserver<T> observer = Interlocked.Exchange<IObserver<T>>(ref this._observer, null);
        if (observer != null)
        {
            this._subject.Unsubscribe(observer);
            this._subject = null;
        }
    }
}

}

4

2 に答える 2

1

私はそれを知っていますが、私を悩ませているのは、それが意味をなさなかったということです。コードをさらに掘り下げてみると、オブザーバーの内部実装にはより多くのオブザーバーが含まれていることがわかりました。以下を参照してください。

また、OnNextメソッドを確認すると、すべてのオブザーバーを反復処理し、OnNextメソッドを呼び出していることがわかります。

今ではすべてが私にとって理にかなっています。ロジックは理解しましたが、どこに実装されているかわかりませんでした。

internal class Observer<T> : IObserver<T>
{
    private readonly ImmutableList<IObserver<T>> _observers;

    public Observer(ImmutableList<IObserver<T>> observers)
    {
        this._observers = observers;
    }

    internal IObserver<T> Add(IObserver<T> observer)
    {
        return new Observer<T>(this._observers.Add(observer));
    }

    public void OnCompleted()
    {
        foreach (IObserver<T> observer in this._observers.Data)
        {
            observer.OnCompleted();
        }
    }

    public void OnError(Exception error)
    {
        foreach (IObserver<T> observer in this._observers.Data)
        {
            observer.OnError(error);
        }
    }

    public void OnNext(T value)
    {
        foreach (IObserver<T> observer in this._observers.Data)
        {
            observer.OnNext(value);
        }
    }

    internal IObserver<T> Remove(IObserver<T> observer)
    {
        int index = Array.IndexOf<IObserver<T>>(this._observers.Data, observer);
        if (index < 0)
        {
            return this;
        }
        if (this._observers.Data.Length == 2)
        {
            return this._observers.Data[1 - index];
        }
        return new Observer<T>(this._observers.Remove(observer));
    }
}
于 2013-02-10T17:57:18.007 に答える
0

Subject は、サブスクライブできるため、observable です。あなたの例でそれを行います(2人のサブスクライバーをサブスクライブしました)。

次のことができるため、サブジェクトはオブザーバーでもあります。

someObservable.Subscribe(subject);

そうすれば、サブジェクトはイベントを受信し、someObservableサブスクライバーに伝達します。

PSコード内でOnNext()メソッドを自分で呼び出しました。しかし、それはまさにsomeObservable、サブジェクトでサブスクライブするときに行うことです。

于 2013-02-10T15:17:37.023 に答える