12

OnNextを使用するときにオブザーバーによってエラーがスローされると、アプリケーションは終了しますObserveOn(Scheduler.ThreadPool)。これに対処するために私が見つけた唯一の方法は、以下のカスタム拡張メソッドを使用することです(OnNextが例外をスローしないようにすることは別として)。そして、それぞれの後に。ObserveOnが続くことを確認しExceptionToErrorます。

    public static IObservable<T> ExceptionToError<T>(this IObservable<T> source) {
        var sub = new Subject<T>();
        source.Subscribe(i => {
            try {
                sub.OnNext(i);
            } catch (Exception err) {
                sub.OnError(err);
            }
        }
            , e => sub.OnError(e), () => sub.OnCompleted());
        return sub;
    }

しかし、これは正しく感じられません。これに対処するためのより良い方法はありますか?

キャッチされない例外が原因で、このプログラムがクラッシュします。

class Program {
    static void Main(string[] args) {
        try {
            var xs = new Subject<int>();

            xs.ObserveOn(Scheduler.ThreadPool).Subscribe(x => {
                Console.WriteLine(x);
                if (x % 5 == 0) {
                    throw new System.Exception("Bang!");
                }
            }, ex => Console.WriteLine("Caught:" + ex.Message)); // <- not reached

            xs.OnNext(1);
            xs.OnNext(2);
            xs.OnNext(3);
            xs.OnNext(4);
            xs.OnNext(5);
        } catch (Exception e) {
            Console.WriteLine("Caught : " + e.Message); // <- also not reached
        } finally {

            Console.ReadKey();
        }
    }
}
4

5 に答える 5

14

RCリリース以降、Rxv2.0でこの問題に対処しています。それについては、http://blogs.msdn.com/rxteamのブログですべて読むことができます。基本的には、パイプライン自体でのより統制のとれたエラー処理と、SubscribeSafe拡張メソッド(サブスクリプション中のエラーをOnErrorチャネルにリダイレクトするため)およびISchedulerでのCatch拡張メソッド(スケジュールされた例外処理ロジックでスケジューラーをラップするため)と組み合わせます。行動)。

ここで提案するExceptionToErrorメソッドに関しては、1つの欠陥があります。IDisposableサブスクリプションオブジェクトは、コールバックの実行時にnullになる可能性があります。基本的な競合状態があります。これを回避するには、SingleAssignmentDisposableを使用する必要があります。

于 2012-06-25T09:16:45.150 に答える
6

サブスクリプションのエラーとオブザーバブルのエラーには違いがあります。簡単なテスト:

var xs = new Subject<int>();

xs.Subscribe(x => { Console.WriteLine(x); if (x % 3 == 0) throw new System.Exception("Error in subscription"); }, 
             ex => Console.WriteLine("Error in source: " + ex.Message));

これで実行すると、ソースで適切に処理されたエラーが発生します。

xs.OnNext(1);
xs.OnNext(2);
xs.OnError(new Exception("from source"));

これで実行すると、サブスクリプションで未処理のエラーが発生します。

xs.OnNext(1);
xs.OnNext(2);
xs.OnNext(3);

ソリューションが行ったことは、サブスクリプションでエラーを取得し、ソースでエラーにすることです。そして、サブスクリプションごとではなく、元のストリームでこれを実行しました。あなたはこれをするつもりだったかもしれないし、しなかったかもしれませんが、それはほぼ間違いなく間違っています。

それを行う「正しい」方法は、必要なエラー処理を、それが属するサブスクライブアクションに直接追加することです。サブスクリプション関数を直接変更したくない場合は、小さなヘルパーを使用できます。

public static Action<T> ActionAndCatch<T>(Action<T> action, Action<Exception> catchAction)
{
    return item =>
    {
        try { action(item); }
        catch (System.Exception e) { catchAction(e); }
    };
}

そして今それを使用するために、再び異なるエラー間の違いを示します:

xs.Subscribe(ActionAndCatch<int>(x => { Console.WriteLine(x); if (x % 3 == 0) throw new System.Exception("Error in subscription"); },
                                 ex => Console.WriteLine("Caught error in subscription: " + ex.Message)),
             ex => Console.WriteLine("Error in source: " + ex.Message));

これで、ソースのエラーとサブスクリプションのエラーを(別々に)処理できるようになりました。もちろん、これらのアクションはいずれもメソッドで定義できるため、上記のコードは(潜在的に)次のように単純になります。

xs.Subscribe(ActionAndCatch(Handler, ExceptionHandler), SourceExceptionHandler);

編集

コメントでは、サブスクリプションのエラーがストリーム自体のエラーを指しているという事実について説明し始めました。そのストリームに他のサブスクライバーは必要ありません。これはまったく異なるタイプの問題です。Validateこのシナリオを処理するために、観察可能な拡張機能を作成する傾向があります。

public static IObservable<T> Validate<T>(this IObservable<T> source, Predicate<T> valid)
{
    return Observable.Create<T>(o => {
        return source.Subscribe(
            x => {
                if (valid(x)) o.OnNext(x);
                else       o.OnError(new Exception("Could not validate: " + x));
            }, e => o.OnError(e), () => o.OnCompleted()
        );
    });
}

次に、比喩を混ぜることなく、簡単に使用できます(ソースのみのエラー):

xs
.Validate(x => x != 3)
.Subscribe(x => Console.WriteLine(x),
             ex => Console.WriteLine("Error in source: " + ex.Message));

それでも例外を抑制したい場合Subscribeは、他の説明されている方法の1つを使用する必要があります。

于 2012-06-25T03:07:33.753 に答える
4

現在のソリューションは理想的ではありません。ここでRxの人々の一人が述べたように:

Rx演算子は、OnNext、OnError、またはOnCompletedの呼び出しで発生する例外をキャッチしません。これは、(1)オブザーバーの実装者がそれらの例外を処理する方法を最もよく知っており、それらに対して合理的なことは何もできないこと、および(2)例外が発生した場合は、Rxによって処理されないようにバブルアウトすることを期待しているためです。 。

現在のソリューションでは、IObserverによってスローされたエラーを処理するためにIObservableを取得します。これは、意味的にはIObservableがそれを監視しているものについての知識を持っていないはずなので意味がありません。次の例を考えてみましょう。

var errorFreeSource = new Subject<int>();
var sourceWithExceptionToError = errorFreeSource.ExceptionToError();
var observerThatThrows = Observer.Create<int>(x =>
  {
      if (x % 5 == 0)
          throw new Exception();
  },
  ex => Console.WriteLine("There's an argument that this should be called"),
  () => Console.WriteLine("OnCompleted"));
var observerThatWorks = Observer.Create<int>(
    x => Console.WriteLine("All good"),
    ex => Console.WriteLine("But definitely not this"),
    () => Console.WriteLine("OnCompleted"));
sourceWithExceptionToError.Subscribe(observerThatThrows);
sourceWithExceptionToError.Subscribe(observerThatWorks);
errorFreeSource.OnNext(1);
errorFreeSource.OnNext(2);
errorFreeSource.OnNext(3);
errorFreeSource.OnNext(4);
errorFreeSource.OnNext(5);
Console.ReadLine();

ここでは、ソースまたはobserverThatWorksに問題はありませんが、別のオブザーバーとの無関係なエラーのために、そのOnErrorが呼び出されます。別のスレッドの例外がプロセスを終了しないようにするには、そのスレッドで例外をキャッチする必要があるため、オブザーバーにtry/catchブロックを配置します。

于 2012-06-25T04:31:05.863 に答える
1

SubscribeSafeこの問題を解決するはずのネイティブメソッドを調べましたが、機能させることができません。IObserver<T>このメソッドには、 :を受け入れる単一のオーバーロードがあります。

// Subscribes to the specified source, re-routing synchronous exceptions during
// invocation of the IObservable<T>.Subscribe(IObserver<T>) method to the
// observer's IObserver<T>.OnError(Exception) channel. This method is typically
// used when writing query operators.
public static IDisposable SubscribeSafe<T>(this IObservable<T> source,
    IObserver<T> observer);

Observer.Createファクトリメソッドによって作成されたオブザーバーを渡そうとしましたが、onNextハンドラーの例外は、通常の場合と同じように、プロセス¹をクラッシュさせ続けますSubscribe。だから私は自分のバージョンのを書くことになったSubscribeSafe。これは、引数として3つのハンドラーを受け入れ、ハンドラーによってスローされた例外onNextonCompletedハンドラーにファネルしますonError

/// <summary>Subscribes an element handler, an error handler, and a completion
/// handler to an observable sequence. Any exceptions thrown by the element or
/// the completion handler are propagated through the error handler.</summary>
public static IDisposable SubscribeSafe<T>(this IObservable<T> source,
    Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
    // Arguments validation omitted
    var disposable = new SingleAssignmentDisposable();
    disposable.Disposable = source.Subscribe(
        value =>
        {
            try { onNext(value); } catch (Exception ex) { onError(ex); disposable.Dispose(); }
        }, onError, () =>
        {
            try { onCompleted(); } catch (Exception ex) { onError(ex); }
        }
    );
    return disposable;
}

ハンドラーで未処理の例外がonError発生すると、プロセスがクラッシュすることに注意してください。

¹ハンドラーがで非同期に呼び出された場合にスローされる例外のみThreadPool

于 2020-11-30T07:52:41.157 に答える
-1

あなたは正しいです-それは気分が悪いはずです。このような主題を使用して返すことは、良い方法ではありません。

少なくとも、次のようにこのメソッドを実装する必要があります。

public static IObservable<T> ExceptionToError<T>(this IObservable<T> source)
{
    return Observable.Create<T>(o =>
    {
        var subscription = (IDisposable)null;
        subscription = source.Subscribe(x =>
        {
            try
            {
                o.OnNext(x);
            }
            catch (Exception ex)
            {
                o.OnError(ex);
                subscription.Dispose();
            }
        }, e => o.OnError(e), () => o.OnCompleted());
        return subscription;
    });
}

サブジェクトが使用されていないことに注意してください。エラーが発生した場合は、シーケンスがエラーを超えて継続しないように、サブスクリプションを破棄します。

ただし、OnErrorサブスクリプションにハンドラーを追加するだけではどうでしょうか。このようなビット:

var xs = new Subject<int>();

xs.ObserveOn(Scheduler.ThreadPool).Subscribe(x =>
{
    Console.WriteLine(x);
    if (x % 5 == 0)
    {
        throw new System.Exception("Bang!");
    }
}, ex => Console.WriteLine(ex.Message));

xs.OnNext(1);
xs.OnNext(2);
xs.OnNext(3);
xs.OnNext(4);
xs.OnNext(5);

このコードは、サブスクリプションでエラーを適切にキャッチします。

もう1つの方法は、Materialize拡張方法を使用することですが、上記の解決策が機能しない場合を除いて、それは少しやり過ぎかもしれません。

于 2012-06-25T01:11:45.553 に答える