James と Richard の両方がいくつかの良い点を指摘しましたが、問題を解決するための最良の方法を提供したとは思いません。
James は を使用することを提案し.Catch(Observable.Never<Unit>())
ました。彼が「... ストリームを続行できるようにする」と言ったとき、彼は間違っていました。なぜなら、例外に遭遇したら、ストリームを終了しなければならないからです。これは、Richard がオブザーバーとオブザーバブルの間の契約について言及したときに指摘したことです。
また、Never
この方法で使用すると、オブザーバブルが完了しなくなります。
手短に言えば、それ.Catch(Observable.Empty<Unit>())
がシーケンスをエラーで終了するシーケンスから完了で終了するシーケンスに変更する正しい方法です。
を使用してソース コレクションの各値を処理し、各例外をキャッチできるようにするという正しいアイデアにたどり着きましたSelectMany
が、いくつかの問題が残っています。
関数呼び出しをオブザーバブルに変えるためだけにタスク (TPL) を使用しています。これは、オブザーバブルがタスク プール スレッドを使用することを強制します。これは、SelectMany
ステートメントが非決定論的な順序で値を生成する可能性が高いことを意味します。
また、データを処理するための実際の呼び出しを非表示にして、リファクタリングとメンテナンスを困難にします。
例外をスキップできる拡張メソッドを作成したほうがよいと思います。ここにあります:
public static IObservable<R> SelectAndSkipOnException<T, R>(
this IObservable<T> source, Func<T, R> selector)
{
return
source
.Select(t =>
Observable.Start(() => selector(t)).Catch(Observable.Empty<R>()))
.Merge();
}
このメソッドを使用すると、これを簡単に実行できます。
var result =
collection.ToObservable()
.SelectAndSkipOnException(t =>
{
var a = DoA(t);
var b = DoB(a);
var c = DoC(b);
return c;
});
このコードははるかに単純ですが、例外が隠されています。シーケンスを継続させながら例外に固執したい場合は、さらにファンキーにする必要があります。拡張メソッドにいくつかのオーバーロードを追加するとMaterialize
、エラーが保持されます。
public static IObservable<Notification<R>> Materialize<T, R>(
this IObservable<T> source, Func<T, R> selector)
{
return source.Select(t => Notification.CreateOnNext(t)).Materialize(selector);
}
public static IObservable<Notification<R>> Materialize<T, R>(
this IObservable<Notification<T>> source, Func<T, R> selector)
{
Func<Notification<T>, Notification<R>> f = nt =>
{
if (nt.Kind == NotificationKind.OnNext)
{
try
{
return Notification.CreateOnNext<R>(selector(nt.Value));
}
catch (Exception ex)
{
ex.Data["Value"] = nt.Value;
ex.Data["Selector"] = selector;
return Notification.CreateOnError<R>(ex);
}
}
else
{
if (nt.Kind == NotificationKind.OnError)
{
return Notification.CreateOnError<R>(nt.Exception);
}
else
{
return Notification.CreateOnCompleted<R>();
}
}
};
return source.Select(nt => f(nt));
}
これらのメソッドを使用すると、次のように記述できます。
var result =
collection
.ToObservable()
.Materialize(t =>
{
var a = DoA(t);
var b = DoB(a);
var c = DoC(b);
return c;
})
.Do(nt =>
{
if (nt.Kind == NotificationKind.OnError)
{
/* Process the error in `nt.Exception` */
}
})
.Where(nt => nt.Kind != NotificationKind.OnError)
.Dematerialize();
Materialize
これらのメソッドをチェーンしてex.Data["Value"]
&ex.Data["Selector"]
を使用して、エラーをスローした値とセレクター関数を取得することもできます。
これが役立つことを願っています。