4

インターリーブされたストリームがあり、それを別々のシーケンシャルストリームに分割します

プロデューサー

int streamCount = 3;

new MyIEnumerable<ElementType>()
.ToObservable(Scheduler.ThreadPool)
.Select((x,i) => new { Key = (i % streamCount), Value = x })
.Subscribe(x => outputs[x.Key].OnNext(x.Value));

outputs[]ストリームを処理するサブジェクトはどこにありますか。以下に定義されています。は.ObserveOn()、ストリームを同時に処理するために使用されます(マルチスレッド)。

消費者

var outputs = Enumerable.Repeat(0, streamCount).Select(_ => new Subject<char>()).ToArray();                                                                                                     

outputs[0].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 0: {0}", x));
outputs[1].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 1: {0}", x));
outputs[2].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 2: {0}", x));

このコードの問題は、出力ストリームが追いつかない場合でも、列挙可能なもの全体を可能な限り高速に読み取ることです。私の場合、列挙可能なのはファイルストリームであるため、これにより大量のメモリが使用される可能性があります。したがって、バッファがあるしきい値に達した場合は、読み取りをブロックしたいと思います。

4

2 に答える 2

3

以下に示すように、プロデューサーとコンシューマーでセマフォを使用してこれを解決しました。ただし、これが良い解決策と見なされるかどうかはわかりません (Rx コントラクト、プログラミング スタイルなどに関して)。

var semaphore = new SemaphoreSlim(MAX_BUFFERED_ELEMENTS);

// add to producer (before subscribe)
.Do(_ => semaphore.Wait());

// add to consumer (before subscribe)
.Do(_ => semaphore.Release()))

CancelationTokenの呼び出しにa を渡してWait()、ストリームが異常停止したときにキャンセルされるようにすることをお勧めします。

于 2012-06-23T10:46:45.243 に答える
2

あなたの解決策は非常に合理的だと思います。最大の問題 (前の質問の背景があります) は、ソリューションの「内部」が現在どこにでも公開されていることです。これを適切にコーディングするときは、次のものをクリーンアップするようにしてください。

  • 単一のメソッドを公開するクラスにすべてをラップしIDisposable Subscribe(<index>, Action)ますIObservable<element> ToObservable(<index>))。返されたサブスクリプションまたは返されたオブザーバブルには、すべての「作業」、つまり追加されたDoアクションなどが既に行われています。その下に辞書またはリストがあるという事実は、ユーザーにはまったく関係がないはずです。そうでない場合、ここでコードを変更すると、すべての場所で変更が必要になります。

  • Aは素晴らしいアイデアです。 または のいずれかCancelationTokenで必ずキャンセルしてください。これは、 へのオーバーロードを使用して行うことができます。OnCompletedOnErrorDo

于 2012-06-23T22:49:43.700 に答える