Ollie Riches のブログ投稿Trying to be more Functional with Rxから抜粋した以下を読み、著者と同じように疑問に思うようになりました: なぜOnCompletedが渡されないのですか? 誰かがここで何が起こっているか教えてもらえますか? おそらく、恥ずかしいほど単純なことでしょうか?
便宜上、コードを少し変更してここに再掲します (ここで彼のコードをリッピングすることが受け入れられない場合は、Ollie に謝罪します)。
public static class RxExtensions
{
public static IObservable<T> Suspendable<T>(this IObservable<T> stream, IObservable<bool> suspend, bool initialState = false)
{
return Observable.Create<T>(o =>
{
var disposable = suspend.StartWith(initialState)
.DistinctUntilChanged()
.Select(s => s ? Observable.Empty<T>() : stream)
.Switch()
.Subscribe(o);
return disposable;
});
}
}
var testScheduler = new TestScheduler();
var generatorCount = 10;
//If the limit will be hardcoded to something less than generatorCount, an exception will be
//thrown and the exception object will be set. Why it doesn't happen to completed in the following?
var generator = Observable.Generate(1,
x => x <= generatorCount,
x => x + 1,
x => { if(x != 11) { Console.WriteLine(x); return x; } else { throw new ArgumentException(); } },
x => TimeSpan.FromSeconds(1),
testScheduler);
Exception exception = null;
var completed = false;
generator.Suspendable(new Subject<bool>()).Subscribe(_ => { }, e => exception = e, () => completed = true);
testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(1001000).Ticks);
Console.WriteLine(exception);
Console.WriteLine(completed);
記録のために、一時停止したストリームはイベントを蓄積し、一時停止したストリームはイベントをスキップするという区別で、一時停止および停止できるストリームを作成しようと考えていました。特に一時停止ビットに制限または「戦略を保存」することを考えている場合は、予想よりも少し複雑に見え始めました. しかたがない...
<編集:興味深いことに、Pausable のRxJS 実装に気付きました。