12

エラーが発生した場合、シーケンス内の次の要素で実行を再開する監視可能なシーケンスを持つ方法はありますか? この投稿から、実行を再開するには Catch() で新しいオブザーバブル シーケンスを指定する必要があるように見えますが、代わりにシーケンスの次の要素で処理を続行する必要がある場合はどうでしょうか? これを達成する方法はありますか?

更新: シナリオは次のとおりです。処理する必要がある要素がたくさんあります。処理は一連のステップで構成されています。ステップを構成したいタスクに分解しました。ここに投稿された ToObservable() のガイドラインに従って、 タスクごとにコンポジション用のオブザーバブルに変換しました。基本的に私はそのようなことをしています -

foreach(element in collection)
{
   var result = from aResult in DoAAsync(element).ToObservable() 
         from bResult in DoBAsync(aResult).ToObservable() 
         from cResult in DoCAsync(bResult).ToObservable() 
         select cResult;
   result.subscribe( register on next and error handlers here)
 }

または私はこのようなことができます:

var result = 
        from element in collection.ToObservable() 
        from aResult in DoAAsync(element).ToObservable() 
         from bResult in DoBAsync(aResult).ToObservable() 
         from cResult in DoCAsync(bResult).ToObservable() 
         select cResult;

要素の1つの処理が例外をスローしたとしましょう. エラーをログに記録して、理想的に進めたいと思います。

4

2 に答える 2

12

James と Richard の両方がいくつかの良い点を指摘しましたが、問題を解決するための最良の方法を提供したとは思いません。

James は を使用することを提案し.Catch(Observable.Never<Unit>())ました。彼が「... ストリームを続行できるようにする」と言ったとき、彼は間違っていました。なぜなら、例外に遭遇したら、ストリームを終了しなければならないからです。これは、Richard がオブザーバーとオブザーバブルの間の契約について言及したときに指摘したことです。

また、Neverこの方法で使用すると、オブザーバブルが完了しなくなります。

手短に言えば、それ.Catch(Observable.Empty<Unit>())がシーケンスをエラーで終了するシーケンスから完了で終了するシーケンスに変更する正しい方法です。

を使用してソース コレクションの各値を処理し、各例外をキャッチできるようにするという正しいアイデアにたどり着きましたSelectManyが、いくつかの問題が残っています。

関数呼び出しをオブザーバブルに変えるためだけにタスク (TPL) を使用しています。これは、オブザーバブルがタスク プール スレッドを使用することを強制します。これは、SelectManyステートメントが非決定論的な順序で値を生成する可能性が高いことを意味します。

また、データを処理するための実際の呼び出しを非表示にして、リファクタリングとメンテナンスを困難にします。

例外をスキップできる拡張メソッドを作成したほうがよいと思います。ここにあります:

public static IObservable<R> SelectAndSkipOnException<T, R>(
    this IObservable<T> source, Func<T, R> selector)
{
    return
        source
            .Select(t =>
                Observable.Start(() => selector(t)).Catch(Observable.Empty<R>()))
            .Merge();
}

このメソッドを使用すると、これを簡単に実行できます。

var result =
    collection.ToObservable()
        .SelectAndSkipOnException(t =>
        {
            var a = DoA(t);
            var b = DoB(a);
            var c = DoC(b);
            return c;
        });

このコードははるかに単純ですが、例外が隠されています。シーケンスを継続させながら例外に固執したい場合は、さらにファンキーにする必要があります。拡張メソッドにいくつかのオーバーロードを追加するとMaterialize、エラーが保持されます。

public static IObservable<Notification<R>> Materialize<T, R>(
    this IObservable<T> source, Func<T, R> selector)
{
    return source.Select(t => Notification.CreateOnNext(t)).Materialize(selector);
}

public static IObservable<Notification<R>> Materialize<T, R>(
    this IObservable<Notification<T>> source, Func<T, R> selector)
{
    Func<Notification<T>, Notification<R>> f = nt =>
    {
        if (nt.Kind == NotificationKind.OnNext)
        {
            try
            {
                return Notification.CreateOnNext<R>(selector(nt.Value));
            }
            catch (Exception ex)
            {
                ex.Data["Value"] = nt.Value;
                ex.Data["Selector"] = selector;
                return Notification.CreateOnError<R>(ex);
            }
        }
        else
        {
            if (nt.Kind == NotificationKind.OnError)
            {
                return Notification.CreateOnError<R>(nt.Exception);
            }
            else
            {
                return Notification.CreateOnCompleted<R>();
            }
        }
    };
    return source.Select(nt => f(nt));
}

これらのメソッドを使用すると、次のように記述できます。

var result =
    collection
        .ToObservable()
        .Materialize(t =>
        {
            var a = DoA(t);
            var b = DoB(a);
            var c = DoC(b);
            return c;
        })
        .Do(nt =>
        {
            if (nt.Kind == NotificationKind.OnError)
            {
                /* Process the error in `nt.Exception` */
            }
        })
        .Where(nt => nt.Kind != NotificationKind.OnError)
        .Dematerialize();

Materializeこれらのメソッドをチェーンしてex.Data["Value"]&ex.Data["Selector"]を使用して、エラーをスローした値とセレクター関数を取得することもできます。

これが役立つことを願っています。

于 2011-07-26T04:52:21.023 に答える
1

IObservableとの間の契約はIObserverOnNext*(OnCompelted|OnError)?ソースによってでなくても、すべてのオペレーターによって支持されます。

唯一の選択肢は、 を使用してソースを再サブスクライブすることですが、ソースがすべての説明に対してインスタンスRetryを返す場合、新しい値は表示されません。IObservable

シナリオに関する詳細情報を提供していただけますか? もしかしたら、別の見方もあるかもしれません。

編集:更新されたフィードバックに基づいて、次のものが必要なようですCatch:

var result = 
    from element in collection.ToObservable() 
    from aResult in DoAAsync(element).ToObservable().Log().Catch(Observable.Empty<TA>())
    from bResult in DoBAsync(aResult).ToObservable().Log().Catch(Observable.Empty<TB>()) 
    from cResult in DoCAsync(bResult).ToObservable().Log().Catch(Observable.Empty<TC>())
    select cResult;

Emptyこれは、エラーを次のシーケンスをトリガーしないに置き換えます(SelectManyフードの下で使用するため)。

于 2011-05-19T07:39:27.523 に答える