10

ある種の処理中のメッセージバスにリアクティブ拡張機能を使用しています。実装は非常に簡単です。

登録は次のようになります

public IDisposable Register<T>(Action<T> action) where T : IMessage
{
   return this.subject
        .OfType<T>()
        .Subscribe(action);
}

そして、簡単に送信します。

private void SendMessage(IMessage message)
    {
        this.subject.OnNext(message);
    }

ただし、RXの例外動作に問題があります。登録/サブスクライブされたアクションで1つの例外がスローされます。つまり、Observableの「ストリーム」が壊れており、サブスクライブしなくなります。このメッセージバスは、アプリケーションの2つの部分で通信に使用されます。予期しない例外がスローされた場合でも、このようなストリームが中断されないようにする必要があります。

4

2 に答える 2

13

ストリームが例外によって中断されないようにする必要がある場合は、例外用に別のチャネルが必要です。

この動作は完全に予想外というわけではありません。IObserver<T>インターフェイスのドキュメントから:

OnError メソッド。通常、データが利用できない、アクセスできない、または破損していること、またはプロバイダーでその他のエラー状態が発生したことを示すために、プロバイダーによって呼び出されます。

これを考えると、ストリームが利用できない、破損しているなどの場合、ストリームを「障害」にする必要があります (これは、WCF で障害が発生しているチャネルに似ています)。IObservable<T>状態は不確定であるため、実装から得られる他のものに依存することはできません。では、それ以上の観測があることを期待する必要があるのはなぜですか?

とはいえ、いくつかのオプションがあります。

例外を飲み込む

次のように、関数にaction渡すデリゲートをラップする必要があります。Register

public IDisposable Register<T>(Action<T> action) where T : IMessage
{
   return this.subject
        .OfType<T>()
        .Subscribe(t => {
            // Execute action
            try { action(t); }
            catch { }
         });
}

もちろん、これは望ましくないかもしれません。プログラムに影響を与える例外を破棄する可能性があるため (または、ここで何が起こっているかを正確に知っていて、それらをスキップしたい場合もあります)、次のものに基づいて構築するために使用できます。解決。

例外がスローされたときに実行するアクションを提供する

上記をベースとして使用するAction<T, Exception>と、例外がスローされたときに呼び出される を要求できます。

public IDisposable Register<T>(Action<T> action, 
    Action<T, Exception> errorHandler) where T : IMessage
{
   return this.subject
        .OfType<T>()
        .Subscribe(t => {
            // Execute action
            try { action(t); }
            catch (Exception e) { errorHandler(t, e); }
         });
}

これで、 から例外がスローactionされると、ストリームを中断することなく例外ハンドラーに渡されます。

上記は簡単にオーバーロードして、例外を飲み込む動作を提供できます(これも、目的に役立つ場合とそうでない場合があります)。

public IDisposable Register<T>(Action<T> action) where T : IMessage
{
    // Call the overload, don't do anything on
    // exception.
    return Register(action, (t, e) => { });
}
于 2012-11-21T14:45:53.487 に答える
1

基本的に、Subscribe メソッドで OnError デリゲートを提供できます。この問題を考慮した小さなチュートリアル (例外処理) を最近ブログに書きました: http://blog.andrei.rinea.ro/2013/06/01/bing-it-on-reactive- extensions-story-code-and-slides/

OnError メソッドを見てください。Rx フレームワークは、エラーが発生したときにシーケンスが壊れることを前提に構築されているため、基本的には再サブスクライブする必要があります。

于 2013-06-01T20:58:54.603 に答える