サブスクライブメソッドとアンサブスクライブメソッドのペアは、構成的ではありません。すべてのオペレーターは、Subscribeに渡されたオブザーバーのディクショナリを保持し、依存するオブザーバブルシーケンスに渡された(オペレーターに渡された)各オブザーバーインスタンスにそれらをマッピングする必要があります。
たとえば、2つのソースに対してMerge演算子を作成することを検討してください。今日、これは次のようになります(textareaコンパイル済み):
static IObservable<T> Merge<T>(IObservable<T> xs, IObservable<T> ys)
{
return Observable.Create<T>(observer =>
{
var n = 2;
var mergeObserver = Observer.Create<T>(
observer.OnNext,
observer.OnError,
() =>
{
// protected by the gate, see use of Synchronize below
if (--n == 0)
observer.OnCompleted();
}
);
var gate = new object();
return new CompositeDisposable(
xs.Synchronize(gate).Subscribe(mergeObserver),
ys.Synchronize(gate).Subscribe(mergeObserver)
);
});
}
ご覧のとおり、シーケンスの構成は、Subscribe呼び出しから返されるIDisposableオブジェクトの構成にもつながります。Observable.Createでは、指定されたオブザーバーに端末メッセージを送信すると、返されたIDisposableを自動的に破棄する多くのことが行われていることに注意してください。この場合、observer.OnErrorとobserver.OnCompletedを呼び出すと、CompositeDisposableで両方のサブスクリプションが破棄されます。(しかし、それはしばらくの間話すのとはまったく異なる主題です。)
以下のコードは、IObservableにSubscribe / Unsubscribeペアが存在することを前提としています(したがって、2つのアクションを持つCreateファクトリメソッドを使用)。
static IObservable<T> Merge<T>(IObservable<T> xs, IObservable<T> ys)
{
var map = new Dictionary<IObserver<T>, IObserver<T>>();
return Observable.Create<T>(
subscribe: observer =>
{
var gate = new object();
var n = 2;
var mergeObserver = Observer.Create<T>(
x =>
{
lock (gate)
observer.OnNext(x);
},
ex =>
{
lock (gate)
observer.OnError(ex);
},
() =>
{
lock (gate)
if (--n == 0)
observer.OnCompleted();
}
);
//
// Using .Synchronize(gate) would be a mess, because then we need to
// keep the two synchronized sequences around as well, such that we
// can call Unsubscribe on those. So, we're "better off" inlining the
// locking code in the observer.
//
// (Or: how composition goes down the drain!)
//
xs.Subscribe(mergeObserver);
ys.Subscribe(mergeObserver);
lock (map)
map[observer] = mergeObserver;
},
unsubscribe: observer =>
{
var mergeObserver = default(IObserver<T>);
lock (map)
map.TryGetValue(observer, out mergeObserver);
if (mergeObserver != null)
{
xs.Unsubscribe(mergeObserver);
ys.Unsubscribe(mergeObserver);
}
}
);
}
これは架空のものであることに注意してください。これ以上のエッジケースについても、OnErrorまたはOnCompletedの呼び出し時にそれ自体をクリーンアップするためにこのCreateがどのように機能するかについても考えていません。また、例としてマージを使用すると、「購読解除」中に気にする他のリソース(スケジューラージョブなど)がないのは幸運です。
お役に立てれば、
-バート(Rxチーム)