7

私はここで苦労しています。普段は本を読んでいますが、まだ本はありません。RXを使用してストリームを読み取ることに関するさまざまな例を無数に見つけましたが、頭を動かすのは非常に難しいと感じています。

Observable.FromAsyncPatternを使用して、StreamのBeginRead/EndReadまたはBeginReadLine/EndReadLineメソッドのラッパーを作成できることはわかっています。

しかし、これは1回だけ読み取ります-最初のオブザーバーがサブスクライブするとき。

ストリームがエラーになるか終了するまでOnNextを読み取り、ポンピングし続けるObservableが必要です。

これに加えて、そのオブザーバブルを複数のサブスクライバーと共有して、すべてのサブスクライバーがアイテムを取得できるようにする方法も知りたいです。

4

4 に答える 4

5

Repeatストリームの最後まで行を読み続けたり、複数のリーダー間での共有を制御しPublishたりするために使用できます。Replay

任意のストリームから最後まで行を読み取るためのシンプルで完全な Rx ソリューションの例は次のようになります。

public static IObservable<string> ReadLines(Stream stream)
{
    return Observable.Using(
        () => new StreamReader(stream),
        reader => Observable.FromAsync(reader.ReadLineAsync)
                            .Repeat()
                            .TakeWhile(line => line != null));
}

このソリューションは、ストリームの最後に達したときにがReadLine戻るという事実も利用しています。null

于 2016-11-15T21:10:34.993 に答える
4

rxxを使用して、Leeの答えに追加します。

using (new FileStream(@"filename.txt", FileMode.Open)
       .ReadToEndObservable()
       .Subscribe(x => Console.WriteLine(x.Length)))
{
  Console.ReadKey();
}

読み取りバッファの長さが出力されます。

于 2013-01-22T13:37:45.440 に答える
1

えー、ここで私の他の答えの1つを再利用するつもりです(とにかく、その一部です):

参照:NetworkStreamから読み取るとバッファが破損します

その中で、私はこのような拡張メソッドを持っています:

public static class Ext
{        
    public static IObservable<byte[]> ReadObservable(this Stream stream, int bufferSize)
    {        
        // to hold read data
        var buffer = new byte[bufferSize];
        // Step 1: async signature => observable factory
        var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
            stream.BeginRead, 
            stream.EndRead);
        return Observable.While(
            // while there is data to be read
            () => stream.CanRead, 
            // iteratively invoke the observable factory, which will
            // "recreate" it such that it will start from the current
            // stream position - hence "0" for offset
            Observable.Defer(() => asyncRead(buffer, 0, bufferSize))
                .Select(readBytes => buffer.Take(readBytes).ToArray()));
    }
}

次のような形式で記述されているように、おそらくこれを使用できます。

// Note: ToEnumerable works here because your filestream
// has a finite length - don't do this with infinite streams!
var blobboData  = stream
     .ReadObservable(bufferSize)
     // take while we're still reading data
     .TakeWhile(returnBuffer => returnBuffer.Length > 0)
     .ToEnumerable()
     // mash them all together
     .SelectMany(buffer => buffer)
     .ToArray();
于 2013-01-22T17:06:43.067 に答える
1

解決策は Observable.Create を使用することです

これは、あらゆる種類のストリームの読み取りに適応できる例です

    public static IConnectableObservable<Command> GetReadObservable(this CommandReader reader)
    {

       return Observable.Create<Command>(async (subject, token) =>
        {


            try
            {

                while (true)
                {

                    if (token.IsCancellationRequested)
                    {
                        subject.OnCompleted();
                        return;
                    }

                    //this part here can be changed to something like this
                    //int received = await Task.Factory.FromAsync<int>(innerSocket.BeginReceive(data, offset, size, SocketFlags.None, null, null), innerSocket.EndReceive);

                    Command cmd = await reader.ReadCommandAsync();

                    subject.OnNext(cmd);

                }

            }

            catch (Exception ex)
            {
                try
                {
                    subject.OnError(ex);
                }
                catch (Exception)
                {
                    Debug.WriteLine("An exception was thrown while trying to call OnError on the observable subject -- means you're not catching exceptions everywhere");
                    throw;
                }
            }

        }).Publish();

    }

返された IConnectableObservable で Connect() を呼び出すことを忘れないでください

于 2013-02-12T21:27:34.680 に答える