観測可能なシーケンスをキャンセルするための2つの便利な演算子を次に示します。それらの違いは、キャンセルの場合に何が起こるかです。原因はシーケンスのTakeUntil
正常な完了(OnCompleted
)を引き起こし、一方、WithCancellation
原因は例外的な終了(OnError
)を引き起こします。
/// <summary>Returns the elements from the source observable sequence until the
/// CancellationToken is canceled.</summary>
public static IObservable<TSource> TakeUntil<TSource>(
this IObservable<TSource> source, CancellationToken cancellationToken)
{
return source
.TakeUntil(Observable.Create<Unit>(observer =>
cancellationToken.Register(() => observer.OnNext(default))));
}
/// <summary>Ties a CancellationToken to an observable sequence. In case of
/// cancellation propagates an OperationCanceledException to the observer.</summary>
public static IObservable<TSource> WithCancellation<TSource>(
this IObservable<TSource> source, CancellationToken cancellationToken)
{
return source
.TakeUntil(Observable.Create<Unit>(o => cancellationToken.Register(() =>
o.OnError(new OperationCanceledException(cancellationToken)))));
}
使用例:
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var pulses = Observable
.Generate(0, i => true, i => i + 1, i => i, i => TimeSpan.FromMilliseconds(500))
.WithCancellation(cts.Token);
注:キャンセルの場合、上記のカスタムオペレーターは、基になるオブザーバブルから即座にサブスクライブを解除します。これは、オブザーバブルに副作用が含まれている場合に考慮すべきことです。副作用を実行するオペレーターの前に置くと、TakeUntil(cts.Token)
副作用が完了するまで(正常な終了)、オブザーバブル全体の完了が延期されます。副作用の後にそれを置くと、キャンセルが瞬時に行われ、実行中のコードがファイアアンドフォーゲット方式で監視されずに実行を継続する可能性があります。