リアクティブ拡張では、
IObservable<T> Switch(this IObservable<IObservable<T>> This)
の実装を希望します
IObserver<T> Switch(this IObservable<IObserver<T>> This)
これにより、発信イベントが別のオブザーバーに切り替えられますが、単一のオブザーバーとして提示されます。
リアクティブ拡張では、
IObservable<T> Switch(this IObservable<IObservable<T>> This)
の実装を希望します
IObserver<T> Switch(this IObservable<IObserver<T>> This)
これにより、発信イベントが別のオブザーバーに切り替えられますが、単一のオブザーバーとして提示されます。
このバージョンは、いくつかの問題を処理します。
イベントが失われる可能性のある競合状態があります。ソースオブザーバブルが別のスレッドで新しいオブザーバーを生成している間に、オブザーバーが1つのスレッドでイベントを監視する場合、同期をまったく使用しないとOnCompleted
、他のスレッドが呼び出す直前に、1つのスレッドで現在のオブザーバーをOnNext
呼び出すことになる可能性がありますその同じオブザーバー。これにより、イベントが失われます。
上記に関連して、デフォルトでは、オブザーバーはスレッドセーフではありません。オブザーバーへの同時呼び出しは絶対にしないでください。そうしないと、プライマリ Rx コントラクトに違反します。ロックがないと、別のスレッドが同じオブザーバーを呼び出しOnCompleted
ているときに、サブスクライバーが を呼び出す可能性があります。このような問題は、Synchronized Subject を使用することですぐに解決できます。ただし、前の問題でも同期が必要なので、単純なミューテックスを使用できます。currentObserver
OnNext
ソースオブザーバブルからサブスクライブを解除する方法が必要です。結果のオブザーバーが完了 (またはエラー) したら、オブザーバーはこれ以上イベントを期待しないように指示されているため、ソースからサブスクライブを解除するのに適した時期だと思います。
コードは次のとおりです。
public static IObserver<T> Switch<T>(this IObservable<IObserver<T>> source)
{
var mutex = new object();
var current = Observer.Create<T>(x => {});
var subscription = source.Subscribe(o =>
{
lock (mutex)
{
current.OnCompleted();
current = o;
}
});
return Observer.Create<T>(
onNext: v =>
{
lock(mutex)
{
current.OnNext(v);
}
},
onCompleted: () =>
{
subscription.Dispose();
lock (mutex)
{
current.OnCompleted();
}
},
onError: e =>
{
subscription.Dispose();
lock (mutex)
{
current.OnError(e);
}
});
}
public static IObserver<T> Switch<T>(this IObservable<IObserver<T>> This)
{
IObserver<T> currentObserver = Observer.Create<T>(x => { });
This.Subscribe(o => { currentObserver.OnCompleted(); currentObserver = o; });
return Observer.Create<T>
( onNext: v => currentObserver.OnNext(v)
, onCompleted: () => currentObserver.OnCompleted()
, onError: v => currentObserver.OnError(v));
}