2

私はリアクティブ拡張機能を初めて使用し、それを(c#で)使用して、インターリーブされた複数のストリームを含むファイルを読み取りたいと思います。基本的に、ファイルはの形式ですABCDABCDABCD...。ファイルを順番に読み取り、ストリーム(つまりAAA..BBB..など)を分離し、ストリームごとに別々のスレッドを使用して、各ストリームを並列に処理することをお勧めします。

各ストリームが可能な限りビジー状態を維持できるようにするために、何らかの形式のバッファリングが必要になります(もちろん制限内)。すべてのストリームが必ずしも同時に開始するわけではありません。その場合、遅延ストリームのために多くの要素をスキップする必要があります。この場合、バッファリングによってギャップが埋められる可能性があります。

ファイル内の要素は小さい(4バイト)ので、かなりおしゃべりです。したがって、私はこれに効率的に対処する方法も探しています。

私は、ファイルを読み取るための列挙型を作成することから始めました。これは、ストリームIDを含む構造体を提供するために作成することも、順序(ストリームの数を法とする要素番号)に基づいてストリームを分離することもできます。ただし、後者の方がおそらく効率的です。

4

2 に答える 2

3

この質問には、特にパフォーマンスと効率について話しているときに、「状況によって異なります」と刻印されていますが、多少工夫された例が示されています。つまり、サンプルファイルは実際のファイルに比べて非常に単純です。しかし、私はそれが有用であるという偶然の機会についていくつかのアドバイスを提供しようとします。

ストリームをに変換する方法は次のとおりEnumerable<char>です。ストリームはバッファリングを適用します。これにより、一度に1つの結果が返されます。これは(データのチャンクを送り返すために)より効率的にすることができますが、ある時点で一度に1つずつ処理する必要があり、ここにある場合もあります。時期尚早に最適化しないでください。

IEnumerable<char> ReadBytes(Stream stream)
{
    using (StreamReader reader = new StreamReader(stream))
    {
        while (!reader.EndOfStream)
            yield return (char)reader.Read();
    }
}

ここで、これが「出力」オブザーバブルの処理コードであるとしましょう。まず、出力オブザーバブルを設定してから、必要に応じてサブスクライブします。ここでは配列を使用しているため、出力される監視可能なインデックスは配列インデックスであることに注意してください。ストリームインデックスをゼロベースのインデックスに変換できない場合は、辞書を使用することもできます。

var outputs = Enumerable.Repeat(0, 3).Select(_ => new Subject<char>()).ToArray();                                                                                                     

outputs[0].Delay(TimeSpan.FromSeconds(2)).Subscribe(x => Console.WriteLine("hi: {0}", x));
outputs[1].Delay(TimeSpan.FromSeconds(1)).Subscribe(x => Console.WriteLine("ho: {0}", x));
outputs[2].Subscribe(x => Console.WriteLine("he: {0}", x));

Subject<char>を使用して要素を送信していることに注意してください。これは要素のタイプによって異なりますが、char与えられた例で機能します。すべてが機能していることを証明するためだけに要素を遅らせることにも注意してください。それらは現在独立したストリームであり、あなたはそれらを使って好きなことをすることができます。

OK、ファイルストリームが与えられた場合:

var file = @"C:\test.txt";
var buffer = 32;
var stream = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read, buffer);

これで、サブスクライブしてモジュロインデックスを使用して、適切な出力ストリームに送信できます。

ReadBytes(stream)
.ToObservable(Scheduler.ThreadPool)
.Select((x,i) => new { Key = (i % 3), Value = x }) // you can change it up here
.Subscribe(x => outputs[x.Key].OnNext(x.Value));

ターゲットストリームを正確に計算する方法に応じて、ここには潜在的により効率的な方法がありますが、考え方は同じです。

入力ファイルには次の1行しか含まれていません。ABCABCABCABCABCABC

プログラムの実行からの出力は次のとおりです。

he: C
he: C
he: C
he: C
he: C
he: C

1秒後:

ho: B
ho: B
ho: B
ho: B
ho: B
ho: B

そしてもう1秒:

hi: A
hi: A
hi: A
hi: A
hi: A
hi: A
于 2012-06-21T22:04:29.767 に答える
1

以下は、衙門の答えに基づいた私の解決策です。正しく機能しているように見えます。つまり、シーケンシャルインターリーブ入力は、並列に処理される(マルチスレッド)複数のシーケンシャルストリームに分割されます。

ただし、これが適切な実装であるかどうかはわかりません(プログラミングスタイル、rxコントラクトなどの点で)。

const int MAX_BUFFERED_ELEMENTS = 1024;

// number of streams in the file
var numberOfStreams = 8;

// semaphore to limit buffered elements
var semaphore = new SemaphoreSlim(MAX_BUFFERED_ELEMENTS);
var cts = new CancellationTokenSource(); // should be used to cancel (left out of this sample)

// create subjects that are the base of each output stream
var subjects = Enumerable.Repeat(0, numberOfStreams).Select(_ => new Subject<ElementType>()).ToArray();

// create the source stream (reader is IEnumerable<ElementType>)
var observable = reader.ToObservable(Scheduler.ThreadPool).Publish();

// forward elements from source to the output subjects
int stream = 0;
observable.Subscribe(x => { 
    semaphores.Wait(cts.Token);   // wait if buffer is full
    _subjects[stream].OnNext(x);  // forward to output stream
    if (++stream >= numberOfStreams) stream = 0; }); // stream = stream++ % numberOfStreams

// build output streams
subjects.Select(
    (s,i) => s.ObserveOn(Scheduler.ThreadPool) // process on separate threads
    .Do(_ => semaphore.Release())              // signal that element is consumed
    .Subscribe(x => Console.WriteLine("stream: {0}\t element: {1}", i, x)) // debug 'processing'
    );

// start processing!
observable.Connect();
于 2012-06-23T02:59:08.590 に答える