2

私はRxを初めて使用するので、ご容赦ください。

Task<T>を でラップしたいIObservable<T>。ここまでは順調ですね:

Task<T> task = Task.Factory.StartNew(...);
IObservable<T> obs = task.ToObservable();

今、私が欲しいのは、オブザーバーが購読を解除したときにタスクをキャンセルするように通知することです:

var cancel = new CancellationToken();
Task<T> task = Task.Factory.StartNew(..., cancel);

IObservable<T> obs = task.ToObservable(); //there should be a way to tie the cancel token
                                          //to the IObservable (?)

IDisposable disposable = obs.Subscribe(...);
Thread.Sleep(1000);
disposable.Dispose(); // this should signal the task to cancel

それ、どうやったら出来るの?

FWIW この接線を生成したシナリオは次のとおりです: Rx とタスク - 新しいタスクが生成されたときに実行中のタスクをキャンセルしますか?

4

2 に答える 2

2

次のようなメソッドがあるとします。

Task<Gizmo> GetGizmoAsync(int id, CancellationToken cancellationToken);

IObservable<Gizmo>以下を使用して、サブスクライブが開始され、サブスクライブ解除がキャンセルされる場所にそれを変えることができTask<Gizmo>ます。

IObservable<Gizmo> observable = Observable.FromAsync(
    cancellationToken => GetGizmoAsync(7, cancellationToken));

// Starts the task:
IDisposable subscription = observable.Subscribe(...);

// Cancels the task if it is still running:
subscription.Dispose();
于 2015-04-24T16:33:34.167 に答える
2

を使用して、私が考えることができる最も簡単な方法を次に示しますObservable.Create

static IObservable<int> SomeRxWork()
{
    return Observable.Create<int>(o =>
    {
        CancellationTokenSource cts = new CancellationTokenSource();
        IDisposable sub = SomeAsyncWork(cts.Token).ToObservable().Subscribe(o);
        return new CompositeDisposable(sub, new CancellationDisposable(cts));
    });
}

static Task<int> SomeAsyncWork(CancellationToken token);

コメントで示唆した最初の方法は、実際にはかなり冗長です。

static IObservable<int> SomeRxWork()
{
    return Observable.Create<int>(async (o, token) =>
    {
        try
        {
            o.OnNext(await SomeAsyncWork(token));
            o.OnCompleted();
        }
        catch (OperationCanceledException)
        {
        }
        catch (Exception ex)
        {
            o.OnError(ex);
        }
    });
}
于 2013-08-28T11:51:36.670 に答える