インターリーブされたストリームがあり、それを別々のシーケンシャルストリームに分割します。
プロデューサー
int streamCount = 3;
new MyIEnumerable<ElementType>()
.ToObservable(Scheduler.ThreadPool)
.Select((x,i) => new { Key = (i % streamCount), Value = x })
.Subscribe(x => outputs[x.Key].OnNext(x.Value));
outputs[]
ストリームを処理するサブジェクトはどこにありますか。以下に定義されています。は.ObserveOn()
、ストリームを同時に処理するために使用されます(マルチスレッド)。
消費者
var outputs = Enumerable.Repeat(0, streamCount).Select(_ => new Subject<char>()).ToArray();
outputs[0].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 0: {0}", x));
outputs[1].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 1: {0}", x));
outputs[2].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 2: {0}", x));
このコードの問題は、出力ストリームが追いつかない場合でも、列挙可能なもの全体を可能な限り高速に読み取ることです。私の場合、列挙可能なのはファイルストリームであるため、これにより大量のメモリが使用される可能性があります。したがって、バッファがあるしきい値に達した場合は、読み取りをブロックしたいと思います。