NuGet の最新の Rx2 を使用しています。以下のコード例では、サブスクリプションは新しいスレッドで処理され、新しい値の生成をブロックしません。それはすべて良いことです。しかし、オブザーバブルによって生成された値が新しいスレッドで順番に処理されるのはなぜですか?
static void Main(string[] args)
{
printThreadId("Started");
var observable = Observable.Generate<int, int>(
0,
x => {
printThreadId("Comparing " + x);
return x < 5; },
x => {
printThreadId("Incrementing " + x);
return x + 1; },
x => {
printThreadId("Selecting " + x);
return x; },
NewThreadScheduler.Default
);
var disp = observable
.ObserveOn(NewThreadScheduler.Default)
.Subscribe(state =>
{
printThreadId("Processing " + state);
Thread.Sleep(1000);
});
Console.ReadLine();
disp.Dispose();
}
static Action<string> printThreadId = (prefix) => Console.WriteLine("{0} on [Worker.{1}]", prefix, Thread.CurrentThread.ManagedThreadId);
上記のコードが完了するまでに約 5 秒かかります。各アクションが新しいスレッドで実行されることを期待していましたが、これは起こっていません。
以下のコードは新しいタスクをスケジュールし、プロセス全体が 1 秒強で完了します。
var disp = observable
.ObserveOn(NewThreadScheduler.Default)
.Subscribe(state =>
{
printThreadId("Processing " + state);
NewThreadScheduler.Default.Schedule(() =>
{
printThreadId("Long running task " + state);
Thread.Sleep(1000);
});
});
ObserveOn
andを使用して長時間実行されるタスクを同時に実行するようにスケジュールするより良い方法はありSubscribe
ますか?