私は次のセットアップを持っています
IObservable<Data> source = ...;
source
.Select(data=>VeryExpensiveOperation(data))
.Subscribe(data=>Console.WriteLine(data));
通常、イベントは妥当な時間枠で区切られます。ユーザーがフォームのテキスト ボックスを更新しているとします。VeryExpensiveOperation
完了までに 5 秒かかる場合があり、その間、砂時計が画面に表示されます。
ただし、5秒間にユーザーがテキストボックスを再度更新した場合、新しいテキストボックスが開始される前に現在のテキストボックスにキャンセルを送信したいと思いVeryExpensiveOperation
ます。
私は次のようなシナリオを想像します
source
.SelectWithCancel((data, cancelToken)=>VeryExpensiveOperation(data, token))
.Subscribe(data=>Console.WriteLine(data));
したがって、ラムダが呼び出されるたびに、キャンセルを管理するために使用できる cancelToken で呼び出されますTask
。ただし、現在、Task、CancelationToken、および RX が混在しています。すべてを組み合わせる方法がよくわかりません。助言がありますか。
XUnit を使用してオペレーターをテストする方法を理解するためのボーナスポイント:)
最初の試み
public static IObservable<U> SelectWithCancelation<T, U>( this IObservable<T> This, Func<CancellationToken, T, Task<U>> fn )
{
CancellationTokenSource tokenSource = new CancellationTokenSource();
return This
.ObserveOn(Scheduler.Default)
.Select(v=>{
tokenSource.Cancel();
tokenSource=new CancellationTokenSource();
return new {tokenSource.Token, v};
})
.SelectMany(o=>Observable.FromAsync(()=>fn(o.Token, o.v)));
}
まだテストされていません。完了しないタスクが、イベントを発生させずに完了する IObservable を生成することを願っていOnNext
ます。