9

IObservable<int>私は500msごとにパルスジェネレータとして機能する非常にシンプルなものを持っています:

var pulses = Observable.GenerateWithTime(0, i => true, i => i + 1, i => i,
                                         i => TimeSpan.FromMilliseconds(500))

そして、私はCancellationTokenSource(同時に進行している他の作業をキャンセルするために使用されます)を持っています。

キャンセルトークンソースを使用して、監視可能なシーケンスをキャンセルするにはどうすればよいですか?

4

5 に答える 5

12

これは古いスレッドですが、将来の参考のために、これを行うためのより簡単な方法を示します。

CancellationTokenをお持ちの場合は、おそらくすでにタスクを操作しています。したがって、それをタスクに変換し、フレームワークにバインディングを実行させます。

using System.Reactive.Threading.Tasks;
...
var task = myObservable.ToTask(cancellationToken);

これにより、タスクがキャンセルされたときに破棄される内部サブスクライバーが作成されます。ほとんどのオブザーバブルはサブスクライバーが存在する場合にのみ値を生成するため、これでほとんどの場合にうまくいきます。

ここで、何らかの理由で破棄する必要のある実際のオブザーバブルがある場合(親タスクがキャンセルされた場合、ホットオブザーバブルはもう重要ではない可能性があります)、これは継続して実現できます。

disposableObservable.ToTask(cancellationToken).ContinueWith(t => {
    if (t.Status == TaskStatus.Canceled)
        disposableObservable.Dispose();
});
于 2014-09-03T13:33:58.707 に答える
8

GenerateWithTimeを使用している場合(現在、Generateがタイムスパン関数のオーバーロードを渡すことに置き換えられています)、次のように2番目のパラメーターを置き換えてキャンセルトークンの状態を評価できます。

var pulses = Observable.Generate(0,
    i => !ts.IsCancellationRequested,
    i => i + 1,
    i => i,
    i => TimeSpan.FromMilliseconds(500));

または、キャンセルトークンを設定するイベントをオブザーバブル自体に変換できる場合は、次のようなものを使用できます。

pulses.TakeUntil(CancelRequested);

より詳細な説明はhttp://www.thinqlinq.com/Post.aspx/Title/Cancelling-a-Reactive-Extensions-Observableにも投稿しました。

于 2011-07-21T13:27:25.190 に答える
2

次のスニペットでIObservableサブスクリプションを接続できますCancellationTokenSource

var pulses = Observable.GenerateWithTime(0,
    i => true, i => i + 1, i => i,
    i => TimeSpan.FromMilliseconds(500));

// Get your CancellationTokenSource
CancellationTokenSource ts = ...

// Subscribe
ts.Token.Register(pulses.Subscribe(...).Dispose);
于 2011-07-20T09:55:12.953 に答える
1

観測可能なシーケンスをキャンセルするための2つの便利な演算子を次に示します。それらの違いは、キャンセルの場合に何が起こるかです。原因はシーケンスのTakeUntil正常な完了(OnCompleted)を引き起こし、一方、WithCancellation原因は例外的な終了(OnError)を引き起こします。

/// <summary>Returns the elements from the source observable sequence until the
/// CancellationToken is canceled.</summary>
public static IObservable<TSource> TakeUntil<TSource>(
    this IObservable<TSource> source, CancellationToken cancellationToken)
{
    return source
        .TakeUntil(Observable.Create<Unit>(observer =>
            cancellationToken.Register(() => observer.OnNext(default))));
}

/// <summary>Ties a CancellationToken to an observable sequence. In case of
/// cancellation propagates an OperationCanceledException to the observer.</summary>
public static IObservable<TSource> WithCancellation<TSource>(
    this IObservable<TSource> source, CancellationToken cancellationToken)
{
    return source
        .TakeUntil(Observable.Create<Unit>(o => cancellationToken.Register(() =>
            o.OnError(new OperationCanceledException(cancellationToken)))));
}

使用例:

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

var pulses = Observable
    .Generate(0, i => true, i => i + 1, i => i, i => TimeSpan.FromMilliseconds(500))
    .WithCancellation(cts.Token);

注:キャンセルの場合、上記のカスタムオペレーターは、基になるオブザーバブルから即座にサブスクライブを解除します。これは、オブザーバブルに副作用が含まれている場合に考慮すべきことです。副作用を実行するオペレーターの前に置くと、TakeUntil(cts.Token)副作用が完了するまで(正常な終了)、オブザーバブル全体の完了が延期されます。副作用の後にそれを置くと、キャンセルが瞬時に行われ、実行中のコードがファイアアンドフォーゲット方式で監視されずに実行を継続する可能性があります。

于 2020-12-08T16:09:00.617 に答える
0

IDisposableサブスクライブからインスタンスを取得します。それを呼びなさいDispose()

于 2011-07-20T09:45:11.303 に答える