元の質問
IObservable
複数のシーケンスを組み合わせて聴きたいというシナリオがありMerge
ます。ただし、これらの1つでエラーが発生した場合、他のストリームのすべてをクラッシュさせたり、シーケンスを再サブスクライブしたりしたくありません(これは「永続的な」シーケンスです)。
Retry()
これを行うには、マージする前にストリームにaを追加します。
IEnumerable<IObservable<int>> observables = GetObservables();
observables
.Select(o => o.Retry())
.Merge()
.Subscribe(/* Do subscription stuff */);
ただし、これをテストしたいときに問題が発生します。私がテストしたいのは、の1つがを生成する場合でも、他IObservable
のsは値を送信でき、処理される必要があるということです。observables
OnError
Subject<int>
で2つのsを表す2つのsを使用すると思いIObservable
ましたobservables
。1つはを送信しOnError(new Exception())
、もう1つはその後、を送信しOnNext(1)
ます。ただし、Subject<int>
新しいサブスクリプションの以前のすべての値を再生し(事実上そうRetry()
です)、テストを無限ループに変えるようです。
最初のサブスクリプションでエラーが発生し、後で空のシーケンスが発生するマニュアルを作成して解決しようとしましたIObservable
が、ハッキーな感じがします。
var i = 0;
var nErrors = 2;
var testErrorObservableWithOneErrorAndThenCompletion = Observable.Create<int>(o => {
i++;
if (i < nErrors) {
return Observable.Throw<int>(new Exception()).Subscribe(o);
} else {
return Observable.Empty<int>().Subscribe(o);
}
});
私は間違った方法で使用Subject
または考えていますか?Retry()
これについて他に何か考えはありますか?この状況をどのように解決しますか?
アップデート
さて、これが私が望んでいることと考え Retry()
ていることの大理石の図です。
o = message, X = error.
------o---o---X
\
Retry() -> \---o---o---X
\
Retry() -> \...
Subject
私の問題はおそらく、以前のエラーをすべて再生したいので、フォアテストを使用するための適切なストッククラスがないということです。
アップデート2
Subject
これは、その値を再生することについて私が何を意味するかを示すテストケースです。冷静にこれを行うと言った場合、私はこの用語を正しく使用していますか?ホットオブザーバブルを作成する方法はわかってSubject
いますが、それでもこの動作は私には「冷たい」と感じます。
var onNext = false;
var subject = new Subject<int>();
subject.Retry().Subscribe(x => onNext = true);
subject.OnError(new Exception());
subject.OnNext(1);
Assert.That(onNext, Is.True);