2

サブスクリプション時にのみ呼び出されるアクションのIObservableを作成するユーティリティメソッドを作成したいAND!これはSubscribeOn(...)ディレクティブの後に続きます。これが私の実装です。これはhttp://www.introtorx.comやその他のリソースから抽出できるものに基づいていますが、特定の1つのケースで失敗します。

    /// <summary>
    /// Makes an observable out of an action. Only at subscription the task will be executed. 
    /// </summary>
    /// <param name="action">The action.</param>
    /// <returns></returns>
    public static IObservable<Unit> MakeObservable_2(Action action)
    {
        return Observable.Create<Unit>(
            observer =>
            {
                return System.Reactive.Concurrency.CurrentThreadScheduler.Instance.Schedule(
                    () =>
                    {
                        try
                        {
                            action();
                            observer.OnNext(Unit.Default);
                            observer.OnCompleted();
                        }
                        catch (Exception ex)
                        {
                            observer.OnError(ex);
                        }
                    });
            });
    }

CurrrentThreadSchedulerを使用すると、SubscribeOn()で指定されたスケジューラーが使用されるようになることを期待していました。この実装は、.SubscribeOn(TaskPoolScheduler.Default)に対しては機能しますが、.SubscribeOn(Dispatcher.CurrentDispatcher)に対しては機能しません。以下のすべての単体テストに合格するように、上記の実装を変更していただけますか?

    [Test]
    public void RxActionUtilities_MakeObservableFromAction_WorksAsExpected()
    {
        ManualResetEvent evt = new ManualResetEvent(false);

        // Timeout of this test if sth. goes wrong below
        Task.Factory.StartNew(() =>
        {
            Thread.Sleep(5000);
            Console.WriteLine("Test timed out!");
            evt.Set();
        });

        int threadIdOfAction = -42;
        int threadIdOfSubscriptionContect = -43;
        bool subscriptionWasCalled = false;

        Action action = () =>
            {
                threadIdOfAction = Thread.CurrentThread.ManagedThreadId;
                Console.WriteLine("This is an action on thread " + threadIdOfAction);
            };

        var observable = RxActionUtilities.MakeObservable_2(action);

        threadIdOfSubscriptionContect = Thread.CurrentThread.ManagedThreadId;
        Console.WriteLine("Before subscription on thread " + threadIdOfSubscriptionContect);

        // The next line is the one I want to have working, but the subscription is never executed
        observable.SubscribeOn(Dispatcher.CurrentDispatcher).Subscribe(
            //observable.Subscribe( // would pass
            (unit) =>
            {
                Console.WriteLine("Subscription: OnNext " + threadIdOfAction + ", " + threadIdOfSubscriptionContect);
                subscriptionWasCalled = true;
            },
            (ex) => evt.Set(), () => evt.Set());

        Console.WriteLine("After subscription");

        evt.WaitOne();

        Assert.AreNotEqual(-42, threadIdOfAction);
        Assert.AreNotEqual(-43, threadIdOfSubscriptionContect);

        Assert.AreEqual(threadIdOfAction, threadIdOfSubscriptionContect);
        Assert.That(subscriptionWasCalled);
    }

    [Test]
    // This test passes with the current implementation
    public void RxActionUtilities_MakeObservableFromActionSubscribeOnDifferentThread_WorksAsExpected()
    {
        ManualResetEvent evt = new ManualResetEvent(false);

        // Timeout of this test if sth. goes wrong below
        Task.Factory.StartNew(() =>
        {
            Thread.Sleep(5000);
            Console.WriteLine("Test timed out!");
            evt.Set();
        });

        int threadIdOfAction = 42;
        int threadIdOfSubscriptionContect = 43;
        bool subscriptionWasCalled = false;

        Action action = () =>
        {
            threadIdOfAction = Thread.CurrentThread.ManagedThreadId;
            Console.WriteLine("This is an action on thread " + threadIdOfAction);
        };

        var observable = RxActionUtilities.MakeObservable_2(action);

        threadIdOfSubscriptionContect = Thread.CurrentThread.ManagedThreadId;
        Console.WriteLine("Before subscription on thread " + threadIdOfSubscriptionContect);

        // The next line is the one I want to have working, but the subscription is never executed
        observable.SubscribeOn(TaskPoolScheduler.Default).Subscribe(
            (unit) =>
            {
                Console.WriteLine("Subscription: OnNext " + threadIdOfAction + ", " + threadIdOfSubscriptionContect);
                subscriptionWasCalled = true;
            },
            (ex) => evt.Set(), () => evt.Set());

        evt.WaitOne();

        Console.WriteLine("After subscription");

        Assert.AreNotEqual(-42, threadIdOfAction);
        Assert.AreNotEqual(-43, threadIdOfSubscriptionContect);
        Assert.AreNotEqual(threadIdOfAction, threadIdOfSubscriptionContect);
        Assert.That(subscriptionWasCalled);
    }


    [Test]
    public void RxActionUtilities_MakeObservableFromAction_IsCancellable()
    {
        ManualResetEvent evt = new ManualResetEvent(false);

        // Timeout of this test if sth. goes wrong below
        Task.Factory.StartNew(() =>
        {
            Thread.Sleep(5000);
            Console.WriteLine("Test timed out!");
            evt.Set();
        });

        int threadIdOfAction = -42;
        int threadIdOfSubscriptionContect = -43;
        bool subscriptionWasCalled = false;
        bool actionTerminated = false;

        Action action = () =>
        {
            threadIdOfAction = Thread.CurrentThread.ManagedThreadId;

            for (int i = 0; i < 10; ++i)
            {
                Console.WriteLine("Some action #" + i);

                Thread.Sleep(200);
            }

            actionTerminated = true;
            evt.Set();
        };

        var observable = RxActionUtilities.MakeObservable_2(action);

        threadIdOfSubscriptionContect = Thread.CurrentThread.ManagedThreadId;
        Console.WriteLine("Before subscription on thread " + threadIdOfSubscriptionContect);

        var subscription =
            observable.SubscribeOn(TaskPoolScheduler.Default).Subscribe(
                (unit) =>
                {
                    Console.WriteLine("Subscription: OnNext " + threadIdOfAction + ", " + threadIdOfSubscriptionContect);
                    subscriptionWasCalled = true;
                },
                (ex) => evt.Set(), () => evt.Set());

        Console.WriteLine("After subscription");

        Thread.Sleep(1000);
        Console.WriteLine("Killing subscription ...");
        subscription.Dispose();
        Console.WriteLine("... done.");

        evt.WaitOne();

        Assert.IsFalse(actionTerminated);

        Assert.AreNotEqual(-42, threadIdOfAction);
        Assert.AreNotEqual(-43, threadIdOfSubscriptionContect);

        Assert.AreEqual(threadIdOfAction, threadIdOfSubscriptionContect);
        Assert.That(subscriptionWasCalled);
    }

アップデート

リーの精巧な答えに応えて、私はそれにもう一度試して、私の質問を再定式化します。IIUC私たちはそれを要約することができます

  • すでに開始されているアクションを停止することはできません
  • Dispatcher.CurrentDispatcherとその仕組みを完全に誤解しました。AFAICSこれはSubscribeOn()の引数として使用するのではなく、ObserveOnの引数としてのみ使用する必要があります。
  • CurrentThreadSchedulerを誤解しました

キャンセル可能なものを作成するには、キャンセルを認識するアクションが必要です。たとえば、を使用しAction<CancellationToken>ます。これが私の次の試みです。この実装がRxフレームワークにうまく適合すると思うかどうか、またはこれを再び改善できるかどうかを教えてください。

public static IObservable<Unit> 
    MakeObservable(Action<CancellationToken> action, IScheduler scheduler)
{
    return Observable.Create<Unit>(
        observer
        =>
        {
            // internally creates a new CancellationTokenSource
            var cancel = new CancellationDisposable(); 

            var scheduledAction = scheduler.Schedule(() =>
            {
                try
                {
                    action(cancel.Token);
                    observer.OnCompleted();
                }
                catch (Exception ex)
                {
                    observer.OnError(ex);
                }
            });

            // Cancellation before execution of action is performed 
            // by disposing scheduledAction
            // Cancellation during execution of action is performed 
            // by disposing cancel
            return new CompositeDisposable(cancel, scheduledAction);
        });
}

TestSchedulerそして、あなたがそれにいるなら:私はsを使ってこれをテストする方法を理解できませんでした:

[Test]
public void MakeObservableFromCancelableAction_CancellationTakesPlaceWithTrueThread()
{
    var scheduler = NewThreadScheduler.Default;

    Action<CancellationToken> action =
        (cancellationToken) =>
        {
            for (int i = 0; i < 10; ++i)
            {
                Console.WriteLine("Some action #" + i);

                if (cancellationToken.IsCancellationRequested)
                {
                    break;
                }

                Thread.Sleep(20);
                // Hoping that the disposal of the subscription stops 
                // the loop before we reach i == 4.
                Assert.Less(i, 4);
            }
        };

    var observable = RxActionUtilities.MakeObservable(action, scheduler);

    var subscription = observable.Subscribe((unit) => { });

    Thread.Sleep(60);

    subscription.Dispose();
}
4

2 に答える 2

2

コードをもっと簡単にすることができ、テストもはるかに簡単にすることができると思います。Rxの利点は、すべてのTask / Thread/ManualResetEventを廃止できることです。また、カスタムコードの代わりに、NUnitの[Timeout]属性を使用することもできると思います。

とにかく...@Perは正しい、Observable.Startはあなたが探しているものです。あなたはそれにアクションとISchedulerを渡します。これはまさにあなたが望むもののようです。

[Test]
public void Run_Action_as_IOb_on_scheduler_with_ObStart()
{
    var scheduler = new TestScheduler();
    var flag = false;
    Action action = () => { flag = true; };

    var subscription = Observable.Start(action, scheduler)
                                    .Subscribe();

    Assert.IsFalse(flag);
    scheduler.AdvanceBy(1);
    Assert.IsTrue(flag);
    subscription.Dispose(); //Not required as the sequence will have completed and then auto-detached.
}

しかし、あなたはそれがいくつかの奇妙な振る舞いをしていることに気付くかもしれません(少なくともこのPCで私が持っているV1では)。具体的には、Observable.Startはアクションをただちに実行するだけで、実際には監視可能なシーケンスがサブスクライブされるのを待ちません。また、このため、サブスクライブを呼び出してから、アクションが実行される前にサブスクリプションを破棄しても効果はありません。うーん、うーん。

[Test]
public void Run_Action_as_IOb_on_scheduler_with_ObStart_dispose()
{
    var scheduler = new TestScheduler();
    var flag = false;
    Action action = () => { flag = true; };

    var subscription = Observable.Start(action, scheduler).Subscribe();


    Assert.IsFalse(flag);
    subscription.Dispose();
    scheduler.AdvanceBy(1);
    Assert.IsFalse(flag);   //FAILS. Oh no! this is true!
}
[Test]
public void Run_Action_as_IOb_on_scheduler_with_ObStart_no_subscribe()
{
    var scheduler = new TestScheduler();
    var flag = false;
    Action action = () => { flag = true; };

    Observable.Start(action, scheduler);
    //Note the lack of subscribe?!

    Assert.IsFalse(flag);
    scheduler.AdvanceBy(1);
    Assert.IsFalse(flag);//FAILS. Oh no! this is true!
}

ただし、Observable.Createを使用するというあなたの道をたどることはできます。あなたはとても近くにいますが、Createデリゲートでスケジュールを設定する必要はありません。Rxを信頼してこれを行ってください。

[Test]
public void Run_Action_as_IOb_on_scheduler_with_ObCreate()
{
    var scheduler = new TestScheduler();
    var flag = false;
    Action action = () => { flag = true; };

    var subscription = Observable.Create<Unit>(observer =>
        {
            try
            {
                action();
                observer.OnNext(Unit.Default);
                observer.OnCompleted();
            }
            catch (Exception ex)
            {
                observer.OnError(ex);
            }
            return Disposable.Empty;
        })
        .SubscribeOn(scheduler)
        .Subscribe();   //Without subscribe, the action wont run.

    Assert.IsFalse(flag);
    scheduler.AdvanceBy(1);
    Assert.IsTrue(flag);
    subscription.Dispose(); //Not required as the sequence will have completed and then auto-detached.
}

[Test]
public void Run_Action_as_IOb_on_scheduler_with_ObCreate_dispose()
{
    var scheduler = new TestScheduler();
    var flag = false;
    Action action = () => { flag = true; };

    var subscription = Observable.Create<Unit>(observer =>
    {
        try
        {
            action();
            observer.OnNext(Unit.Default);
            observer.OnCompleted();
        }
        catch (Exception ex)
        {
            observer.OnError(ex);
        }
        return Disposable.Empty;
    })
        .SubscribeOn(scheduler)
        .Subscribe();   //Without subscribe, the action wont run.

    Assert.IsFalse(flag);
    subscription.Dispose();
    scheduler.AdvanceBy(1);
    Assert.IsFalse(flag);   //Subscription was disposed before the scheduler was able to run, so the action did not run.
}

処理中のアクションの途中で実際のアクションをキャンセルできるようにしたい場合は、これよりも高度な処理を行う必要があります。

最終的な実装は単純です。

public static class RxActionUtilities
{
    /// <summary>
    /// Makes an observable out of an action. Only at subscription the task will be executed. 
    /// </summary>
    /// <param name="action">The action.</param>
    /// <returns></returns>
    /// <example>
    /// <code>
    /// <![CDATA[
    /// RxActionUtilities.MakeObservable_3(myAction)
    ///                  .SubscribeOn(_schedulerProvider.TaskPoolScheduler)
    ///                  .Subscribe(....);
    /// 
    /// ]]>
    /// </code>
    /// </example>
    public static IObservable<Unit> MakeObservable_3(Action action)
    {
        return Observable.Create<Unit>(observer =>
            {
                try
                {
                    action();
                    observer.OnNext(Unit.Default);
                    observer.OnCompleted();
                }
                catch (Exception ex)
                {
                    observer.OnError(ex);
                }
                return Disposable.Empty;
            });
    }
}

それがお役に立てば幸いです。

編集:ユニットテストでのディスパッチャーの使用法について説明します。混乱を増すために別のレイヤー(Rx)を適用する前に、まずそれがどのように機能するかを理解する必要があると思います。WPFでコーディングするときにRxがもたらす主な利点の1つは、スケジューラーを介したディスパッチャーの抽象化です。これにより、WPFでの同時実行性を簡単にテストできます。たとえば、ここでのこの簡単なテストは失敗します。

[Test, Timeout(2000)]
public void DispatcherFail()
{
    var wasRun = false;
    Action MyAction = () =>
        {
            Console.WriteLine("Running...");
            wasRun = true;
            Console.WriteLine("Run.");
        };
    Dispatcher.CurrentDispatcher.BeginInvoke(MyAction);

    Assert.IsTrue(wasRun);
}

これを実行すると、コンソールに何も出力されないため、競合状態が発生せず、アクションが実行されないことがわかります。これは、ディスパッチャがメッセージループを開始していないためです。このテストを修正するには、厄介なインフラストラクチャコードでテストを埋める必要があります。

[Test, Timeout(2000)]
public void Testing_with_Dispatcher_BeginInvoke()
{
    var frame = new DispatcherFrame();  //1 - The Message loop
    var wasRun = false;
    Action MyAction = () =>
    {
        Console.WriteLine("Running...");
        wasRun = true;
        Console.WriteLine("Run.");
        frame.Continue = false;         //2 - Stop the message loop, else we hang forever
    };
    Dispatcher.CurrentDispatcher.BeginInvoke(MyAction);

    Dispatcher.PushFrame(frame);        //3 - Start the message loop

    Assert.IsTrue(wasRun);
}

したがって、WPFでの同時実行が必要なすべてのテストでこれを実行したくないことは明らかです。frame.Continue=falseを制御できないアクションに挿入しようとすると悪夢になります。幸い、IScheudlerは、Scheduleメソッドを介して必要なものをすべて公開しています。

次のCurrentThreadSchedulerは、SynchronizationContext(これはあなたが思っていることです)ではなく、トランポリンとして考える必要があります。

于 2012-12-18T17:04:39.190 に答える
-1

Observable.Startはあなたが探しているものだと思います。 http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.start(v=vs.103).aspx

于 2012-12-17T13:25:37.083 に答える