この質問には、特にパフォーマンスと効率について話しているときに、「状況によって異なります」と刻印されていますが、多少工夫された例が示されています。つまり、サンプルファイルは実際のファイルに比べて非常に単純です。しかし、私はそれが有用であるという偶然の機会についていくつかのアドバイスを提供しようとします。
ストリームをに変換する方法は次のとおりEnumerable<char>
です。ストリームはバッファリングを適用します。これにより、一度に1つの結果が返されます。これは(データのチャンクを送り返すために)より効率的にすることができますが、ある時点で一度に1つずつ処理する必要があり、ここにある場合もあります。時期尚早に最適化しないでください。
IEnumerable<char> ReadBytes(Stream stream)
{
using (StreamReader reader = new StreamReader(stream))
{
while (!reader.EndOfStream)
yield return (char)reader.Read();
}
}
ここで、これが「出力」オブザーバブルの処理コードであるとしましょう。まず、出力オブザーバブルを設定してから、必要に応じてサブスクライブします。ここでは配列を使用しているため、出力される監視可能なインデックスは配列インデックスであることに注意してください。ストリームインデックスをゼロベースのインデックスに変換できない場合は、辞書を使用することもできます。
var outputs = Enumerable.Repeat(0, 3).Select(_ => new Subject<char>()).ToArray();
outputs[0].Delay(TimeSpan.FromSeconds(2)).Subscribe(x => Console.WriteLine("hi: {0}", x));
outputs[1].Delay(TimeSpan.FromSeconds(1)).Subscribe(x => Console.WriteLine("ho: {0}", x));
outputs[2].Subscribe(x => Console.WriteLine("he: {0}", x));
Subject<char>
を使用して要素を送信していることに注意してください。これは要素のタイプによって異なりますが、char
与えられた例で機能します。すべてが機能していることを証明するためだけに要素を遅らせることにも注意してください。それらは現在独立したストリームであり、あなたはそれらを使って好きなことをすることができます。
OK、ファイルストリームが与えられた場合:
var file = @"C:\test.txt";
var buffer = 32;
var stream = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read, buffer);
これで、サブスクライブしてモジュロインデックスを使用して、適切な出力ストリームに送信できます。
ReadBytes(stream)
.ToObservable(Scheduler.ThreadPool)
.Select((x,i) => new { Key = (i % 3), Value = x }) // you can change it up here
.Subscribe(x => outputs[x.Key].OnNext(x.Value));
ターゲットストリームを正確に計算する方法に応じて、ここには潜在的により効率的な方法がありますが、考え方は同じです。
入力ファイルには次の1行しか含まれていません。ABCABCABCABCABCABC
プログラムの実行からの出力は次のとおりです。
he: C
he: C
he: C
he: C
he: C
he: C
1秒後:
ho: B
ho: B
ho: B
ho: B
ho: B
ho: B
そしてもう1秒:
hi: A
hi: A
hi: A
hi: A
hi: A
hi: A