サブスクリプション時にのみ呼び出されるアクションのIObservableを作成するユーティリティメソッドを作成したいAND!これはSubscribeOn(...)ディレクティブの後に続きます。これが私の実装です。これはhttp://www.introtorx.comやその他のリソースから抽出できるものに基づいていますが、特定の1つのケースで失敗します。
/// <summary>
/// Makes an observable out of an action. Only at subscription the task will be executed.
/// </summary>
/// <param name="action">The action.</param>
/// <returns></returns>
public static IObservable<Unit> MakeObservable_2(Action action)
{
return Observable.Create<Unit>(
observer =>
{
return System.Reactive.Concurrency.CurrentThreadScheduler.Instance.Schedule(
() =>
{
try
{
action();
observer.OnNext(Unit.Default);
observer.OnCompleted();
}
catch (Exception ex)
{
observer.OnError(ex);
}
});
});
}
CurrrentThreadSchedulerを使用すると、SubscribeOn()で指定されたスケジューラーが使用されるようになることを期待していました。この実装は、.SubscribeOn(TaskPoolScheduler.Default)に対しては機能しますが、.SubscribeOn(Dispatcher.CurrentDispatcher)に対しては機能しません。以下のすべての単体テストに合格するように、上記の実装を変更していただけますか?
[Test]
public void RxActionUtilities_MakeObservableFromAction_WorksAsExpected()
{
ManualResetEvent evt = new ManualResetEvent(false);
// Timeout of this test if sth. goes wrong below
Task.Factory.StartNew(() =>
{
Thread.Sleep(5000);
Console.WriteLine("Test timed out!");
evt.Set();
});
int threadIdOfAction = -42;
int threadIdOfSubscriptionContect = -43;
bool subscriptionWasCalled = false;
Action action = () =>
{
threadIdOfAction = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine("This is an action on thread " + threadIdOfAction);
};
var observable = RxActionUtilities.MakeObservable_2(action);
threadIdOfSubscriptionContect = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine("Before subscription on thread " + threadIdOfSubscriptionContect);
// The next line is the one I want to have working, but the subscription is never executed
observable.SubscribeOn(Dispatcher.CurrentDispatcher).Subscribe(
//observable.Subscribe( // would pass
(unit) =>
{
Console.WriteLine("Subscription: OnNext " + threadIdOfAction + ", " + threadIdOfSubscriptionContect);
subscriptionWasCalled = true;
},
(ex) => evt.Set(), () => evt.Set());
Console.WriteLine("After subscription");
evt.WaitOne();
Assert.AreNotEqual(-42, threadIdOfAction);
Assert.AreNotEqual(-43, threadIdOfSubscriptionContect);
Assert.AreEqual(threadIdOfAction, threadIdOfSubscriptionContect);
Assert.That(subscriptionWasCalled);
}
[Test]
// This test passes with the current implementation
public void RxActionUtilities_MakeObservableFromActionSubscribeOnDifferentThread_WorksAsExpected()
{
ManualResetEvent evt = new ManualResetEvent(false);
// Timeout of this test if sth. goes wrong below
Task.Factory.StartNew(() =>
{
Thread.Sleep(5000);
Console.WriteLine("Test timed out!");
evt.Set();
});
int threadIdOfAction = 42;
int threadIdOfSubscriptionContect = 43;
bool subscriptionWasCalled = false;
Action action = () =>
{
threadIdOfAction = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine("This is an action on thread " + threadIdOfAction);
};
var observable = RxActionUtilities.MakeObservable_2(action);
threadIdOfSubscriptionContect = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine("Before subscription on thread " + threadIdOfSubscriptionContect);
// The next line is the one I want to have working, but the subscription is never executed
observable.SubscribeOn(TaskPoolScheduler.Default).Subscribe(
(unit) =>
{
Console.WriteLine("Subscription: OnNext " + threadIdOfAction + ", " + threadIdOfSubscriptionContect);
subscriptionWasCalled = true;
},
(ex) => evt.Set(), () => evt.Set());
evt.WaitOne();
Console.WriteLine("After subscription");
Assert.AreNotEqual(-42, threadIdOfAction);
Assert.AreNotEqual(-43, threadIdOfSubscriptionContect);
Assert.AreNotEqual(threadIdOfAction, threadIdOfSubscriptionContect);
Assert.That(subscriptionWasCalled);
}
[Test]
public void RxActionUtilities_MakeObservableFromAction_IsCancellable()
{
ManualResetEvent evt = new ManualResetEvent(false);
// Timeout of this test if sth. goes wrong below
Task.Factory.StartNew(() =>
{
Thread.Sleep(5000);
Console.WriteLine("Test timed out!");
evt.Set();
});
int threadIdOfAction = -42;
int threadIdOfSubscriptionContect = -43;
bool subscriptionWasCalled = false;
bool actionTerminated = false;
Action action = () =>
{
threadIdOfAction = Thread.CurrentThread.ManagedThreadId;
for (int i = 0; i < 10; ++i)
{
Console.WriteLine("Some action #" + i);
Thread.Sleep(200);
}
actionTerminated = true;
evt.Set();
};
var observable = RxActionUtilities.MakeObservable_2(action);
threadIdOfSubscriptionContect = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine("Before subscription on thread " + threadIdOfSubscriptionContect);
var subscription =
observable.SubscribeOn(TaskPoolScheduler.Default).Subscribe(
(unit) =>
{
Console.WriteLine("Subscription: OnNext " + threadIdOfAction + ", " + threadIdOfSubscriptionContect);
subscriptionWasCalled = true;
},
(ex) => evt.Set(), () => evt.Set());
Console.WriteLine("After subscription");
Thread.Sleep(1000);
Console.WriteLine("Killing subscription ...");
subscription.Dispose();
Console.WriteLine("... done.");
evt.WaitOne();
Assert.IsFalse(actionTerminated);
Assert.AreNotEqual(-42, threadIdOfAction);
Assert.AreNotEqual(-43, threadIdOfSubscriptionContect);
Assert.AreEqual(threadIdOfAction, threadIdOfSubscriptionContect);
Assert.That(subscriptionWasCalled);
}
アップデート
リーの精巧な答えに応えて、私はそれにもう一度試して、私の質問を再定式化します。IIUC私たちはそれを要約することができます
- すでに開始されているアクションを停止することはできません
- Dispatcher.CurrentDispatcherとその仕組みを完全に誤解しました。AFAICSこれはSubscribeOn()の引数として使用するのではなく、ObserveOnの引数としてのみ使用する必要があります。
- CurrentThreadSchedulerを誤解しました
キャンセル可能なものを作成するには、キャンセルを認識するアクションが必要です。たとえば、を使用しAction<CancellationToken>
ます。これが私の次の試みです。この実装がRxフレームワークにうまく適合すると思うかどうか、またはこれを再び改善できるかどうかを教えてください。
public static IObservable<Unit>
MakeObservable(Action<CancellationToken> action, IScheduler scheduler)
{
return Observable.Create<Unit>(
observer
=>
{
// internally creates a new CancellationTokenSource
var cancel = new CancellationDisposable();
var scheduledAction = scheduler.Schedule(() =>
{
try
{
action(cancel.Token);
observer.OnCompleted();
}
catch (Exception ex)
{
observer.OnError(ex);
}
});
// Cancellation before execution of action is performed
// by disposing scheduledAction
// Cancellation during execution of action is performed
// by disposing cancel
return new CompositeDisposable(cancel, scheduledAction);
});
}
TestScheduler
そして、あなたがそれにいるなら:私はsを使ってこれをテストする方法を理解できませんでした:
[Test]
public void MakeObservableFromCancelableAction_CancellationTakesPlaceWithTrueThread()
{
var scheduler = NewThreadScheduler.Default;
Action<CancellationToken> action =
(cancellationToken) =>
{
for (int i = 0; i < 10; ++i)
{
Console.WriteLine("Some action #" + i);
if (cancellationToken.IsCancellationRequested)
{
break;
}
Thread.Sleep(20);
// Hoping that the disposal of the subscription stops
// the loop before we reach i == 4.
Assert.Less(i, 4);
}
};
var observable = RxActionUtilities.MakeObservable(action, scheduler);
var subscription = observable.Subscribe((unit) => { });
Thread.Sleep(60);
subscription.Dispose();
}