7

Rx で処理したいユーザー インタラクション シナリオがあります。

このシナリオは、標準的な「ユーザーが入力を停止したときに何らかの作業を行う」(通常、ユーザーがこれまでに入力した内容を検索する) (1) に似ていますが、次のことも必要です。

  • (2) "do some work" ユニットの最新の結果のみを取得する (以下を参照)
  • (3)新しい作業単位が開始されたら、進行中の作業をキャンセルします(私の場合はCPUを集中的に使用します)

(1)IObservableユーザー イベントにを使用し、イベント.Throttle()間の一時停止 (「ユーザーが入力を停止」) でのみトリガーするように調整します。

それから、私は.Select(_ => CreateMyTask(...).ToObservable())

これによりIObservable<IObservable<T>>、内部の各オブザーバブルが単一のタスクをラップする場所が得られます。

(2) を取得するには、最終的.Switch()に最新の作業単位からの結果のみを取得するように申請します。

(3) - 保留中のタスクをキャンセルしますか?

私が正しく理解していれば、新しい inner があるたびにIObservable<T>.Switch()メソッドはそれにサブスクライブし、以前のものからサブスクライブを解除して、 Dispose().
たぶん、タスクをキャンセルするように何らかの方法で配線することができますか?

4

2 に答える 2

3

タスクを使用する必要がありますか?

純粋に Observables を使用することに満足している場合は、これを自分でうまく行うことができます。

次のようにしてみてください。

var query =
    Observable.Create<int>(o =>
    {
        var cancelling = false;
        var cancel = Disposable.Create(() =>
        {
            cancelling = true;
        });
        var subscription = Observable.Start(() =>
        {
            for (var i = 0; i < 100; i++)
            {
                Thread.Sleep(10); //1000 ms in total
                if (cancelling)
                {
                    Console.WriteLine("Cancelled on {0}", i);
                    return -1;
                }
            }
            Console.WriteLine("Done");
            return 42;
        }).Subscribe(o);
        return new CompositeDisposable(cancel, subscription);
    });

このオブザーバブルは for ループで を使用してハードワークを行っていますThread.Sleep(10);が、オブザーバブルが破棄されるとループが終了し、集中的な CPU 作業が停止します。次に、標準の RxDisposeを で使用しSwitchて、進行中の作業をキャンセルできます。

それをメソッドにまとめたい場合は、これを試してください:

public static IObservable<T> Start<T>(Func<Func<bool>, T> work)
{
    return Observable.Create<T>(o =>
    {
        var cancelling = false;
        var cancel = Disposable
            .Create(() => cancelling = true);
        var subscription = Observable
            .Start(() => work(() => cancelling))
            .Subscribe(o);
        return new CompositeDisposable(cancel, subscription);
    });
}

そして、次のような関数で呼び出します。

Func<Func<bool>, int> work = cancelling =>
{
    for (var i = 0; i < 100; i++)
    {
        Thread.Sleep(10); //1000 ms in total
        if (cancelling())
        {
            Console.WriteLine("Cancelled on {0}", i);
            return -1;
        }
    }
    Console.WriteLine("Done");
    return 42;
};

これが機能することを証明した私のコードは次のとおりです。

var disposable =
    ObservableEx
        .Start(work)
        .Subscribe(x => Console.WriteLine(x));

Thread.Sleep(500);
disposable.Dispose();

出力として「Cancelled on 50」(「Cancelled on 51」の場合もある) を取得しました。

于 2013-08-28T02:33:33.847 に答える