0

リアクティブ拡張では、

IObservable<T> Switch(this IObservable<IObservable<T>> This)

の実装を希望します

IObserver<T> Switch(this IObservable<IObserver<T>> This)

これにより、発信イベントが別のオブザーバーに切り替えられますが、単一のオブザーバーとして提示されます。

4

2 に答える 2

3

このバージョンは、いくつかの問題を処理します。

  • イベントが失われる可能性のある競合状態があります。ソースオブザーバブルが別のスレッドで新しいオブザーバーを生成している間に、オブザーバーが1つのスレッドでイベントを監視する場合、同期をまったく使用しないとOnCompleted、他のスレッドが呼び出す直前に、1つのスレッドで現在のオブザーバーをOnNext呼び出すことになる可能性がありますその同じオブザーバー。これにより、イベントが失われます。

  • 上記に関連して、デフォルトでは、オブザーバーはスレッドセーフではありません。オブザーバーへの同時呼び出しは絶対にしないでください。そうしないと、プライマリ Rx コントラクトに違反します。ロックがないと、別のスレッドが同じオブザーバーを呼び出しOnCompletedているときに、サブスクライバーが を呼び出す可能性があります。このような問題は、Synchronized Subject を使用することですぐに解決できます。ただし、前の問題でも同期が必要なので、単純なミューテックスを使用できます。currentObserverOnNext

  • ソースオブザーバブルからサブスクライブを解除する方法が必要です。結果のオブザーバーが完了 (またはエラー) したら、オブザーバーはこれ以上イベントを期待しないように指示されているため、ソースからサブスクライブを解除するのに適した時期だと思います。

コードは次のとおりです。

public static IObserver<T> Switch<T>(this IObservable<IObserver<T>> source)
{
    var mutex = new object();
    var current = Observer.Create<T>(x => {});
    var subscription = source.Subscribe(o =>
    {
        lock (mutex)
        {
           current.OnCompleted();
           current = o;
        }
    });

    return Observer.Create<T>(
        onNext: v =>
        {
            lock(mutex)
            {                
              current.OnNext(v);
            }
        },
        onCompleted: () =>
        {
             subscription.Dispose();
             lock (mutex)
             {
                 current.OnCompleted();
             }
        },
        onError: e =>
        {
             subscription.Dispose();
             lock (mutex)
             {
                 current.OnError(e);
             }
        });
}
于 2013-04-15T19:46:08.090 に答える
1
public static IObserver<T> Switch<T>(this IObservable<IObserver<T>> This)
{
    IObserver<T> currentObserver = Observer.Create<T>(x => { });

    This.Subscribe(o => { currentObserver.OnCompleted(); currentObserver = o; });


    return Observer.Create<T>
        ( onNext: v => currentObserver.OnNext(v)
        , onCompleted: () => currentObserver.OnCompleted()
        , onError: v => currentObserver.OnError(v));
}
于 2013-04-15T06:18:38.407 に答える