初心者の場合、通常、独自のオブジェクトを実装するべきではありませIObservableんIObserver。あなたの質問は、その理由を示唆しています。根本的な動作を正しくすることは非常に困難です。
これで、を呼び出すたびに。がIObservable.Subscribe返されますIDisposable。これは、またはが呼び出されるSubscribe前に、の呼び出し元がオブザーバブルのサブスクライブを解除したい場合に使用されます。ただし、または が呼び出された場合、は自動的に破棄されます。したがって、効果的にRxは、監視可能なコレクションが完了すると、自動的にクリーンアップします。OnCompletedOnErrorOnCompletedOnErrorIDisposable
個々のオブザーバーは、それ自体のサブスクリプションの有効期間を管理する必要はありません。オブザーバーはOnCompleted/OnErrorメッセージに応答するだけで済みます。
あなたのコードでは、コードを少し変更することを検討することをお勧めします。CommandReaderPublisherメソッドを持つクラスの方がクラスSubscribeよりも適切かもしれないと思いCommandReaderます。Rxストリームが完了すると、それを継続して使用することはできません。
また、基になるストリームが閉じるときよりも呼び出しのOnCompleted方が良いのではないかと思います。OnError(exception)エラーが発生した場合は問題ありませんが、エラーが発生した場合は問題がONCompletedない可能性があります。