1

C# で .NET の RX ライブラリをいじっています。次のコードで「observer.OnCompleted()」メソッドが何もしない理由を誰かに説明してもらえますか:

var observableStream = Observable.Create<CustomMessage>(
       (observer) =>
           {
               CustomMessage cm = new CustomMessage();
               CustomMessage.Subscribe(observer.OnNext);

               return Disposable.Create(
               () =>
                   {
                       Console.WriteLine("Disposing...");
                       CustomMessage.Unsubscribe(observer.OnNext);                          
                       observer.OnCompleted();                //***Nothing happens here***
                   }
               );
       });

    //IObserver.OnException()
    public override void OnException(Exception e)
    {
        Console.WriteLine("Exception occurred - " + e.Message);
    }

    //IObserver.OnComplete()
    public override void OnUnsubscribe()
    {
        Console.WriteLine("Unsubscribed...");            
    }

    //IObserver.OnNext()
    public override void HandleNextMsg(IRVMessage msg)
    {
        Console.WriteLine("Instance received a message");
    }

IDisposable myDisposable = observableStream.Subscribe(HandleNextMsg, OnException, OnUnsubscribe);

//At some later point....
myDisposable.Dispose();

このコードは、CustomMessages のストリームへのサブスクリプションを作成することを目的としています。サブスクリプションを設定するときに、observer.OnNext() メソッドを CustomMessage タイプに登録します。次に、サブスクリプションを破棄するときに、observer.OnNext() の登録を解除します。これはすべて正しく機能します。私の 'HandleNextMsg()' メソッドは、CustomMessage が受信されるたびに呼び出されます。

後でサブスクリプションを終了したいときに「Dispose()」を呼び出すと、次の 2 行が正常に実行されます。

Console.WriteLine("Disposing...");
CustomMessage.Unsubscribe(observer.OnNext); 

その後、CustomMessages を受信しなくなりました。ただし、次の行は実行されますが、何もしません。

observer.OnCompleted(); 

私はそれが次の行を呼び出すことを期待していました:

Console.WriteLine("Unsubscribed...");

ある時点で、オブザーバーと 'OnUnsubscribe' メソッドの間の接続が失われ、何が起こっているのか正確に理解したいと思います。「observer.OnNext()」は正常に登録解除できるのに、「observer.OnCompleted()」は何もしないのはどうしてですか?

ストリームを破棄しているからといって、「OnCompleted()」を呼び出す必要があるわけではないという指摘を受けましたが、それでも機能しない理由を理解したいと思います。

4

2 に答える 2

2

あなたが見ている問題は、Observable.Create渡された関数をラップする方法と、結果としてサブスクライブするオブザーバーによって引き起こされますIObservable。基本的に、流れは次のとおりです。

  • Observable.Create は AnonymousObservable を返します。
  • AnonymousObservable は、Subscribe の AutoDetachObserver でオブザーバーをラップします。
  • AnonymousObservable は、Subscribe から AutoDetachObserver (IDisposable を実装) を返します。
  • AutoDetachObserver.Dispose は停止フラグを設定し元のサブスクライブ関数が返したオブジェクトを破棄します。このフラグにより​​、オブザーバーは OnError および OnCompleted への今後の呼び出しを無視するため、ラップされたオブザーバー メソッドが呼び出されなくなります。

この回答は RX の v1.x に基づいていますが、v2.0 でも変更されていないと思います。

サブスクリプションの終了方法 (OnError、OnCompleted、または Dispose) に関係なく実行する必要があるコードがある場合は、Observable.Finallyをお勧めします。

于 2012-11-26T16:55:55.793 に答える