0

Ollie Riches のブログ投稿Trying to be more Functional with Rxから抜粋した以下を読み、著者と同じように疑問に思うようになりました: なぜOnCompletedが渡されないのですか? 誰かがここで何が起こっているか教えてもらえますか? おそらく、恥ずかしいほど単純なことでしょうか?

便宜上、コードを少し変更してここに再掲します (ここで彼のコードをリッピングすることが受け入れられない場合は、Ollie に謝罪します)。

public static class RxExtensions
{
    public static IObservable<T> Suspendable<T>(this IObservable<T> stream, IObservable<bool> suspend, bool initialState = false)
    {
        return Observable.Create<T>(o =>
        {
            var disposable = suspend.StartWith(initialState)
                    .DistinctUntilChanged()
                    .Select(s => s ? Observable.Empty<T>() : stream)
                    .Switch()
                    .Subscribe(o);

            return disposable;
        });
    }
}

var testScheduler = new TestScheduler();
var generatorCount = 10;

//If the limit will be hardcoded to something less than generatorCount, an exception will be
//thrown and the exception object will be set. Why it doesn't happen to completed in the following?
var generator = Observable.Generate(1,
    x => x <= generatorCount,
    x => x + 1,
    x => { if(x != 11) { Console.WriteLine(x); return x; } else { throw new ArgumentException(); } },
    x => TimeSpan.FromSeconds(1),
    testScheduler);


Exception exception = null;
var completed = false;
generator.Suspendable(new Subject<bool>()).Subscribe(_ => { }, e => exception = e, () => completed = true);   
testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(1001000).Ticks);

Console.WriteLine(exception);
Console.WriteLine(completed);

記録のために、一時停止したストリームはイベントを蓄積し、一時停止したストリームはイベントをスキップするという区別で、一時停止および停止できるストリームを作成しようと考えていました。特に一時停止ビットに制限または「戦略を保存」することを考えている場合は、予想よりも少し複雑に見え始めました. しかたがない...

<編集:興味深いことに、Pausable のRxJS 実装に気付きました。

4

2 に答える 2

1

オブザーバーは、suspendストリームとストリームの両方にサブスクライブしsourceます。この結合されたストリームは、両方のストリームが完了するまで完了しません。基本的にsourceストリームは完了しますが、Suspendableはさらに一時停止/一時停止解除の信号が来るかどうかを待っています。その場合、ソース ストリームを再サブスクライブします。

ソース ストリームが完了したときに一時停止可能なストリームを完了することは可能ですが、おそらくメソッドの目的に反することになります。基本的に、何かがソース ストリームをサブスクライブしたままにし、ソースが完了すると一時停止したストリームを終了する必要があります。次のようなものでそれを行うことができます:

var shared = stream.Publish();
var pausable = suspend
    .StartWith(initialState)
    .TakeUntil(shared.LastOrDefaultAsync())
    .DistinctUntilChanged()
    .Select(p => p ? shared : Observable.Empty<T>())
    .Switch();
var disposable = new CompositeDisposable(pausable.Subscribe(o), shared.Connect());
return disposable;
于 2014-10-24T20:57:40.643 に答える
0

サブスクリプションが Observable.Empty() にあり、_generator の子孫ではないため、Completed は送信されません

だから、AI を使ってさらに良い答えを出します CombineLatest

public static IObservable<T> Suspendable<T>(
    this IObservable<T> source,
    IObservable<bool> pauser,
    bool initialState = false)
{
    return 
        source.CombineLatest(pauser.StartWith(initialState), 
                             (value, paused) => new {value, paused})
              .Where(_=>!_.paused)
              .Select(_=>_.value);
}
于 2016-10-09T07:54:58.927 に答える