1

Rx初心者です。ドキュメントによると、各値の OnNext ハンドラーを異なるスレッドで並行して実行できるようです。ObserveOn の適切なスケジューラーの実装が必要なだけです。

しかし、この単純なコードは同期的に実行されます。

var dict = new Dictionary<int, object> {{1, null}, {5, null}, {6, null}};

dict.ToObservable()
  .SubscribeOn(ThreadPoolScheduler.Default)
  .Subscribe(kv => {
     Console.WriteLine("Thread {0}, key {1}", Thread.CurrentThread.ManagedThreadId, kv.Key);
     Thread.Sleep(1000);
  });

画面には、反復ごとに同じスレッド ID が表示されます。ここで何が欠けていますか?

4

1 に答える 1

3

Rx は、特定の Observer の観点からOnNext がオーバーラップすることを具体的に禁止する文法を定義します。ストリームへの複数のサブスクライバーを並行して呼び出すことができます (ただし、これは Rx オペレーターの実装者によって異なります)。

ここでは、同じストリームから異なるレートで OnNext を処理する 2 つのサブスクライバーがあります。

void Main()
{
    var stream = Observable.Interval(TimeSpan.FromSeconds(1));    

    var sub1 = stream.Subscribe(x => {
        Console.WriteLine("Sub1 handler start: " + Thread.CurrentThread.ManagedThreadId);
        Thread.Sleep(4000);
        Console.WriteLine("Sub1 handler end");
    });

    var sub2 = stream.Subscribe(x => {
        Console.WriteLine("Sub2 handler start: " + Thread.CurrentThread.ManagedThreadId);
        Thread.Sleep(2000);
        Console.WriteLine("Sub2 handler end");
    });

    Console.ReadLine();
}

これが出力です。Sub2 が Sub1 よりも先に進んでおり、それぞれが独自のスレッド上にあることを確認してください。

Sub2 handler start: 18
Sub1 handler start: 12
Sub2 handler end
Sub2 handler start: 18
Sub2 handler end
Sub2 handler start: 18
Sub1 handler end
Sub1 handler start: 12
Sub2 handler end
Sub2 handler start: 18
Sub2 handler end
Sub1 handler end
Sub2 handler start: 18
Sub1 handler start: 12

各サブスクリプションが独自のスレッドを取得するということは何もないことに注意してください。これは、スケジューラとオペレーターの実装方法にかかっています。OnNext* (OnError | OnCompleted)の Rx 文法に準拠している限り、何でも構いません。

あなたの特定のシナリオでは、PLINQ / TPL を調べます。Rx よりも適しているように感じます。

ところで、Lee Campbell のwww.introtorx.comは、始めたばかりの場合に役立つリソースです。

于 2013-10-23T22:38:54.343 に答える