Rx は、特定の Observer の観点からOnNext がオーバーラップすることを具体的に禁止する文法を定義します。ストリームへの複数のサブスクライバーを並行して呼び出すことができます (ただし、これは Rx オペレーターの実装者によって異なります)。
ここでは、同じストリームから異なるレートで OnNext を処理する 2 つのサブスクライバーがあります。
void Main()
{
var stream = Observable.Interval(TimeSpan.FromSeconds(1));
var sub1 = stream.Subscribe(x => {
Console.WriteLine("Sub1 handler start: " + Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(4000);
Console.WriteLine("Sub1 handler end");
});
var sub2 = stream.Subscribe(x => {
Console.WriteLine("Sub2 handler start: " + Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(2000);
Console.WriteLine("Sub2 handler end");
});
Console.ReadLine();
}
これが出力です。Sub2 が Sub1 よりも先に進んでおり、それぞれが独自のスレッド上にあることを確認してください。
Sub2 handler start: 18
Sub1 handler start: 12
Sub2 handler end
Sub2 handler start: 18
Sub2 handler end
Sub2 handler start: 18
Sub1 handler end
Sub1 handler start: 12
Sub2 handler end
Sub2 handler start: 18
Sub2 handler end
Sub1 handler end
Sub2 handler start: 18
Sub1 handler start: 12
各サブスクリプションが独自のスレッドを取得するということは何もないことに注意してください。これは、スケジューラとオペレーターの実装方法にかかっています。OnNext* (OnError | OnCompleted)の Rx 文法に準拠している限り、何でも構いません。
あなたの特定のシナリオでは、PLINQ / TPL を調べます。Rx よりも適しているように感じます。
ところで、Lee Campbell のwww.introtorx.comは、始めたばかりの場合に役立つリソースです。