元の質問
IObservable複数のシーケンスを組み合わせて聴きたいというシナリオがありMergeます。ただし、これらの1つでエラーが発生した場合、他のストリームのすべてをクラッシュさせたり、シーケンスを再サブスクライブしたりしたくありません(これは「永続的な」シーケンスです)。
Retry()これを行うには、マージする前にストリームにaを追加します。
IEnumerable<IObservable<int>> observables = GetObservables();
observables
.Select(o => o.Retry())
.Merge()
.Subscribe(/* Do subscription stuff */);
ただし、これをテストしたいときに問題が発生します。私がテストしたいのは、の1つがを生成する場合でも、他IObservableのsは値を送信でき、処理される必要があるということです。observablesOnError
Subject<int>で2つのsを表す2つのsを使用すると思いIObservableましたobservables。1つはを送信しOnError(new Exception())、もう1つはその後、を送信しOnNext(1)ます。ただし、Subject<int>新しいサブスクリプションの以前のすべての値を再生し(事実上そうRetry()です)、テストを無限ループに変えるようです。
最初のサブスクリプションでエラーが発生し、後で空のシーケンスが発生するマニュアルを作成して解決しようとしましたIObservableが、ハッキーな感じがします。
var i = 0;
var nErrors = 2;
var testErrorObservableWithOneErrorAndThenCompletion = Observable.Create<int>(o => {
i++;
if (i < nErrors) {
return Observable.Throw<int>(new Exception()).Subscribe(o);
} else {
return Observable.Empty<int>().Subscribe(o);
}
});
私は間違った方法で使用Subjectまたは考えていますか?Retry()これについて他に何か考えはありますか?この状況をどのように解決しますか?
アップデート
さて、これが私が望んでいることと考え Retry()ていることの大理石の図です。
o = message, X = error.
------o---o---X
\
Retry() -> \---o---o---X
\
Retry() -> \...
Subject私の問題はおそらく、以前のエラーをすべて再生したいので、フォアテストを使用するための適切なストッククラスがないということです。
アップデート2
Subjectこれは、その値を再生することについて私が何を意味するかを示すテストケースです。冷静にこれを行うと言った場合、私はこの用語を正しく使用していますか?ホットオブザーバブルを作成する方法はわかってSubjectいますが、それでもこの動作は私には「冷たい」と感じます。
var onNext = false;
var subject = new Subject<int>();
subject.Retry().Subscribe(x => onNext = true);
subject.OnError(new Exception());
subject.OnNext(1);
Assert.That(onNext, Is.True);