6

私は次のセットアップを持っています

IObservable<Data> source = ...;

source
    .Select(data=>VeryExpensiveOperation(data))
    .Subscribe(data=>Console.WriteLine(data));

通常、イベントは妥当な時間枠で区切られます。ユーザーがフォームのテキスト ボックスを更新しているとします。VeryExpensiveOperation 完了までに 5 秒かかる場合があり、その間、砂時計が画面に表示されます。

ただし、5秒間にユーザーがテキストボックスを再度更新した場合、新しいテキストボックスが開始される前に現在のテキストボックスにキャンセルを送信したいと思いVeryExpensiveOperation ます。

私は次のようなシナリオを想像します

source
    .SelectWithCancel((data, cancelToken)=>VeryExpensiveOperation(data, token))
    .Subscribe(data=>Console.WriteLine(data));

したがって、ラムダが呼び出されるたびに、キャンセルを管理するために使用できる cancelToken で呼び出されますTask。ただし、現在、Task、CancelationToken、および RX が混在しています。すべてを組み合わせる方法がよくわかりません。助言がありますか。

XUnit を使用してオペレーターをテストする方法を理解するためのボーナスポイント:)

最初の試み

    public static IObservable<U> SelectWithCancelation<T, U>( this IObservable<T> This, Func<CancellationToken, T, Task<U>> fn )
    {
        CancellationTokenSource tokenSource = new CancellationTokenSource();

        return This
            .ObserveOn(Scheduler.Default)
            .Select(v=>{
                tokenSource.Cancel();
                tokenSource=new CancellationTokenSource();
                return new {tokenSource.Token, v};
            })
            .SelectMany(o=>Observable.FromAsync(()=>fn(o.Token, o.v)));
    }

まだテストされていません。完了しないタスクが、イベントを発生させずに完了する IObservable を生成することを願っていOnNextます。

4

2 に答える 2

13

VeryExpensiveOperationキャンセル可能な非同期のものとしてモデル化する必要があります。TaskまたはのいずれかIObservable。私はそれが次のタスクであると仮定しますCancellationToken

Task<TResult> VeryExpensiveOperationAsync<TSource, TResult>(TSource item, CancellationToken token);

次に、次のようにします。

source
    .Select(item => Observable.DeferAsync(async token =>
    {
        // do not yield the observable until after the operation is completed
        // (ie do not just do VeryExpensiveOperation(...).ToObservable())
        // because DeferAsync() will dispose of the token source as soon
        // as you provide the observable (instead of when the observable completes)
        var result = await VeryExpensiveOperationAsync(item, token);
        return Observable.Return(result);
    })
    .Switch();

Select、サブスクライブするとトークンを作成し、操作を開始する遅延オブザーバブルを作成するだけです。操作が完了する前にオブザーバブルが登録解除された場合、トークンはキャンセルされます。

は、サブスクライブしていた以前のオブザーバブルからサブスクライブを解除して、 からSwitch出てくる新しい各オブザーバブルをサブスクライブします。Select

これはあなたが望む効果があります。

PSこれは簡単にテストできます。VeryExpensiveOperationモック ソースと単体テストによって提供されるを使用するモックを提供するだけでTaskCompletetionSource、単体テストは新しいソース項目がいつ生成され、いつタスクが完了するかを正確に制御できます。このようなもの:

void SomeTest()
{
    // create a test source where the values are how long
    // the mock operation should wait to do its work.
    var source = _testScheduler.CreateColdObservable<int>(...);

    // records the actions (whether they completed or canceled)
    List<bool> mockActionsCompleted = new List<bool>();
    var resultStream = source.SelectWithCancellation((token, delay) =>
    {
        var tcs = new TaskCompletionSource<string>();
        var tokenRegistration = new SingleAssignmentDisposable();

        // schedule an action to complete the task
        var d = _testScheduler.ScheduleRelative(delay, () =>
        {
           mockActionsCompleted.Add(true);
           tcs.SetResult("done " + delay);
           // stop listening to the token
           tokenRegistration.Dispose();
        });

        // listen to the token and cancel the task if the token signals
        tokenRegistration.Disposable = token.Register(() =>
        {
           mockActionsCompleted.Add(false);
           tcs.TrySetCancelled();
           // cancel the scheduled task
           d.Dispose();
        });

        return tcs.Task;
    });

    // subscribe to resultStream
    // start the scheduler
    // assert the mockActionsCompleted has the correct sequence
    // assert the results observed were what you expected.
}

testScheduler.Start()新しいアクションが動的にスケジュールされているため、使用中に問題が発生する可能性があります。while ループのtestScheduler.AdvanceBy(1)方がうまくいくかもしれません。

于 2013-07-24T15:08:43.920 に答える
-1

なぜスロットルを使わないのですか?

http://rxwiki.wikidot.com/101samples#toc30

スロットルは、指定された期間にイベントが生成されなくなるまで、イベントの流れを停止します。たとえば、テキスト ボックスの TextChanged イベントを 0.5 秒に調整すると、ユーザーが入力を 0.5 秒間停止するまで、イベントは渡されません。これは、キーストロークごとに新しい検索を開始するのではなく、ユーザーが一時停止するまで待ちたい検索ボックスで役立ちます。

SearchTextChangedObservable = Observable.FromEventPattern<TextChangedEventArgs>(this.textBox, "TextChanged");
_currentSubscription = SearchTextChangedObservable.Throttle(TimeSpan.FromSeconds(.5)).ObserveOnDispatcher
于 2013-07-24T16:12:20.367 に答える