5

元の質問

IObservable複数のシーケンスを組み合わせて聴きたいというシナリオがありMergeます。ただし、これらの1つでエラーが発生した場合、他のストリームのすべてをクラッシュさせたり、シーケンスを再サブスクライブしたりしたくありません(これは「永続的な」シーケンスです)。

Retry()これを行うには、マージする前にストリームにaを追加します。

IEnumerable<IObservable<int>> observables = GetObservables();

observables
    .Select(o => o.Retry())
    .Merge()
    .Subscribe(/* Do subscription stuff */);

ただし、これをテストしたいときに問題が発生します。私がテストしたいのは、の1つがを生成する場合でも、他IObservableのsは値を送信でき、処理される必要があるということです。observablesOnError

Subject<int>で2つのsを表す2つのsを使用すると思いIObservableましたobservables。1つはを送信しOnError(new Exception())、もう1つはその後、を送信しOnNext(1)ます。ただし、Subject<int>新しいサブスクリプションの以前のすべての値を再生し(事実上そうRetry()です)、テストを無限ループに変えるようです。

最初のサブスクリプションでエラーが発生し、後で空のシーケンスが発生するマニュアルを作成して解決しようとしましたIObservableが、ハッキーな感じがします。

var i = 0;
var nErrors = 2;
var testErrorObservableWithOneErrorAndThenCompletion = Observable.Create<int>(o => {
    i++;
    if (i < nErrors) {
        return Observable.Throw<int>(new Exception()).Subscribe(o);
    } else {
        return Observable.Empty<int>().Subscribe(o);
    }
});

私は間違った方法で使用Subjectまたは考えていますか?Retry()これについて他に何か考えはありますか?この状況をどのように解決しますか?

アップデート

さて、これが私が望んでいることと考え Retry()ていることの大理石の図です。

o = message, X = error.
------o---o---X
               \
     Retry() -> \---o---o---X
                             \
                   Retry() -> \...

Subject私の問題はおそらく、以前のエラーをすべて再生したいので、フォアテストを使用するための適切なストッククラスがないということです。

アップデート2

Subjectこれは、その値を再生することについて私が何を意味するかを示すテストケースです。冷静にこれを行うと言った場合、私はこの用語を正しく使用していますか?ホットオブザーバブルを作成する方法はわかってSubjectいますが、それでもこの動作は私には「冷たい」と感じます。

var onNext = false;
var subject = new Subject<int>();

subject.Retry().Subscribe(x => onNext = true);
subject.OnError(new Exception());
subject.OnNext(1);

Assert.That(onNext, Is.True);
4

1 に答える 1

4

更新された要件(単に無視したいのではなく、失敗したオブザーバブルを再試行したい)に基づいて、機能するソリューションを考え出すことができます。

まず、コールドオブザーバブル(サブスクリプションごとに再作成)とホットオブザーバブル(サブスクリプションに関係なく存在する)の違いを理解することが重要です。Retry()基になるイベントを再作成する方法がわからないため、ホットオブザーバブルを作成することはできません。つまり、ホットな観測可能なエラーが発生した場合、それは永久に失われます。

SubjectOnNextサブスクライバーがいなくても電話をかけることができ、期待どおりに機能するという意味で、ホットオブザーバブルを作成します。ホットオブザーバブルをコールドオブザーバブルに変換するには、を使用できますObservable.Defer。これには、そのオブザーバブルの「サブスクリプションの作成」ロジックが含まれます。

そうは言っても、これを行うために変更された元のコードは次のとおりです。

var success = new Subject<int>();
var error = new Subject<int>();

var observables = new List<IObservable<int>> { Observable.Defer(() => {success = new Subject<int>(); return success.AsObservable();}), 
                                               Observable.Defer(() => {error = new Subject<int>(); return error.AsObservable();}) };                                            

observables
.Select(o => o.Retry())
.Merge()
.Subscribe(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("done"));

そしてテスト(以前と同様):

success.OnNext(1);
error.OnError(new Exception("test"));
success.OnNext(2);
error.OnNext(-1);
success.OnCompleted();
error.OnCompleted();

そして期待通りの出力:

1
2
-1
done

もちろん、基礎となるオブザーバブルに応じて、この概念を大幅に変更する必要があります。テストに被験者を使用することは、実際に被験者を使用することと同じではありません。

このコメントにも注意したい:

ただし、Subjectは新しいサブスクリプション(事実上Retry())の以前のすべての値を再生し、テストを無限ループに変えるようです。

真実ではありません-Subjectこのように動作しません。サブスクリプションを再作成するという事実に基づいて無限ループを引き起こしているコードの他の側面がありRetry、サブスクリプションはある時点でエラーを作成します。


元の回答(完了用)

問題は、それRetry()があなたがしたいことをしないということです。ここから:

http://msdn.microsoft.com/en-us/library/ff708141(v=vs.92).aspx

ソースの監視可能なシーケンスをretryCount回、または正常に終了するまで繰り返します。

これは、Retry成功してエラーがスローされなくなるまで、基になるオブザーバブルに継続的に再接続しようとすることを意味します。

私の理解では、実際には、オブザーバブルの例外を再試行ではなく無視する必要があります。これは代わりにあなたが望むことをします:

observables
.Select(o => o.Catch((Func<Exception,IObservable<int>>)(e => Observable.Empty<int>())))
.Merge()
.Subscribe(/* subscription code */);

これはCatch、例外を除いてオブザーバブルをトラップし、その時点で空のオブザーバブルに置き換えるために使用します。

科目を使用した完全なテストは次のとおりです。

var success = new Subject<int>();
var error = new Subject<int>();

var observables = new List<IObservable<int>> { success.AsObservable(), error.AsObservable() };

observables
.Select(o => o.Catch((Func<Exception,IObservable<int>>)(e => Observable.Empty<int>())))
.Merge()
.Subscribe(Observer.Create<int>(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("done")));

success.OnNext(1);
error.OnError(new Exception("test"));
success.OnNext(2);
success.OnCompleted();

そして、これは予想通りに生成します:

1
2
done
于 2012-06-11T11:10:15.457 に答える