1

NuGet の最新の Rx2 を使用しています。以下のコード例では、サブスクリプションは新しいスレッドで処理され、新しい値の生成をブロックしません。それはすべて良いことです。しかし、オブザーバブルによって生成された値が新しいスレッドで順番に処理されるのはなぜですか?

static void Main(string[] args)
{
  printThreadId("Started");
  var observable = Observable.Generate<int, int>(
      0,
      x => {
        printThreadId("Comparing " + x);
        return x < 5; },
      x => {
        printThreadId("Incrementing " + x);
        return x + 1; },
      x => {
        printThreadId("Selecting " + x);
        return x; },
      NewThreadScheduler.Default
  );

  var disp = observable
    .ObserveOn(NewThreadScheduler.Default)
    .Subscribe(state =>
    {
      printThreadId("Processing " + state);
      Thread.Sleep(1000);
    });

  Console.ReadLine();
  disp.Dispose();
}

static Action<string> printThreadId = (prefix) => Console.WriteLine("{0} on [Worker.{1}]", prefix, Thread.CurrentThread.ManagedThreadId);

上記のコードが完了するまでに約 5 秒かかります。各アクションが新しいスレッドで実行されることを期待していましたが、これは起こっていません。

以下のコードは新しいタスクをスケジュールし、プロセス全体が 1 秒強で完了します。

var disp = observable
 .ObserveOn(NewThreadScheduler.Default)
 .Subscribe(state =>
 {
   printThreadId("Processing " + state);
   NewThreadScheduler.Default.Schedule(() =>
   {
      printThreadId("Long running task " + state);
      Thread.Sleep(1000);
   });
 });

ObserveOnandを使用して長時間実行されるタスクを同時に実行するようにスケジュールするより良い方法はありSubscribeますか?

4

1 に答える 1

3

リアクティブ フレームワークは、後続の値がオーバーラップなしでシリアル化されることを明確に保証します。これにより、多くの同時実行の問題を防ぐことができます。この動作は仕様です。

アクションはNewThreadSchedulerキューに入れられるので、連続してスケジュールされたアクションがある場合、スケジューラは新しいスレッドの作成を気にせず、現在のスレッドを使用するだけです。ギャップがある場合 (何かがキューに入れられている可能性がありますが、それは将来のためです)、現在のスレッドは終了することが許可され、アクションの期限が来ると新しいスレッドが作成されます。

サブスクリプションを並行して実行したい場合は、それを実現するために最善を尽くす必要があります。しかし、良いニュースは、それほど難しくないということです。

既存のものを保持する場合は、observableこれを試すことができます:

Func<int, IObservable<Unit>> process = state =>
    Observable.Start(() =>
    {
        printThreadId("Processing " + state);
        Thread.Sleep(1000);
        return Unit.Default;
    });

var query =
    from x in observable
    select process(x);

var disp =
    query
        .Merge()
        .ObserveOn(NewThreadScheduler.Default)
        .Subscribe();

実行したところ、次の結果が得られました。

Started on [Worker.23]
Comparing 0 on [Worker.26]
Selecting 0 on [Worker.26]
Processing 0 on [Worker.20]
Incrementing 0 on [Worker.26]
Comparing 1 on [Worker.26]
Selecting 1 on [Worker.26]
Incrementing 1 on [Worker.26]
Comparing 2 on [Worker.26]
Selecting 2 on [Worker.26]
Processing 1 on [Worker.18]
Incrementing 2 on [Worker.26]
Comparing 3 on [Worker.26]
Selecting 3 on [Worker.26]
Processing 3 on [Worker.13]
Incrementing 3 on [Worker.26]
Comparing 4 on [Worker.26]
Selecting 4 on [Worker.26]
Processing 4 on [Worker.12]
Incrementing 4 on [Worker.26]
Comparing 5 on [Worker.26]
Processing 2 on [Worker.8]
于 2012-10-30T04:06:04.647 に答える