タスクを使用する必要がありますか?
純粋に Observables を使用することに満足している場合は、これを自分でうまく行うことができます。
次のようにしてみてください。
var query =
Observable.Create<int>(o =>
{
var cancelling = false;
var cancel = Disposable.Create(() =>
{
cancelling = true;
});
var subscription = Observable.Start(() =>
{
for (var i = 0; i < 100; i++)
{
Thread.Sleep(10); //1000 ms in total
if (cancelling)
{
Console.WriteLine("Cancelled on {0}", i);
return -1;
}
}
Console.WriteLine("Done");
return 42;
}).Subscribe(o);
return new CompositeDisposable(cancel, subscription);
});
このオブザーバブルは for ループで を使用してハードワークを行っていますThread.Sleep(10);
が、オブザーバブルが破棄されるとループが終了し、集中的な CPU 作業が停止します。次に、標準の RxDispose
を で使用しSwitch
て、進行中の作業をキャンセルできます。
それをメソッドにまとめたい場合は、これを試してください:
public static IObservable<T> Start<T>(Func<Func<bool>, T> work)
{
return Observable.Create<T>(o =>
{
var cancelling = false;
var cancel = Disposable
.Create(() => cancelling = true);
var subscription = Observable
.Start(() => work(() => cancelling))
.Subscribe(o);
return new CompositeDisposable(cancel, subscription);
});
}
そして、次のような関数で呼び出します。
Func<Func<bool>, int> work = cancelling =>
{
for (var i = 0; i < 100; i++)
{
Thread.Sleep(10); //1000 ms in total
if (cancelling())
{
Console.WriteLine("Cancelled on {0}", i);
return -1;
}
}
Console.WriteLine("Done");
return 42;
};
これが機能することを証明した私のコードは次のとおりです。
var disposable =
ObservableEx
.Start(work)
.Subscribe(x => Console.WriteLine(x));
Thread.Sleep(500);
disposable.Dispose();
出力として「Cancelled on 50」(「Cancelled on 51」の場合もある) を取得しました。