9

私たちのアプリケーションの一部として (現在約 4 か月間本番環境にあります)、IObservable に変換する外部デバイスからのデータ ストリームがあります。

これまで、以下を使用して生成してきましたが、非常にうまく機能しています。

IObservable<string> ObserveStringStream(Stream inputStream)
{
    var streamReader = new StreamReader(inputStream);
    return Observable
            .Create<string>(observer => Scheduler.ThreadPool
            .Schedule(() => ReadLoop(streamReader, observer)));
}

private void ReadLoop(StreamReader reader, IObserver<string> observer)
{
    while (true)
    {
        try
        {
            var line = reader.ReadLine();
            if (line != null)
            {
                observer.OnNext(line);
            }
            else
            {
                observer.OnCompleted();
                break;
            }
        }
        catch (Exception ex)
        {
            observer.OnError(ex);
            break;
        }
    }
}

昨夜、yield return構文を使用して同じ結果を達成する方法があるかどうか疑問に思い、これを思いつきました:

IObservable<string> ObserveStringStream(Stream inputStream)
{
    var streamReader = new StreamReader(inputStream);
    return ReadLoop(streamReader)
            .ToObservable(Scheduler.ThreadPool);
}

private IEnumerable<string> ReadLoop(StreamReader reader)
{
    while (true)
    {
        var line = reader.ReadLine();
        if (line != null)
        {
            yield return line;
        }
        else
        {
            yield break;
        }
    }
}

それは非常にうまく機能しているようで、はるかにクリーンですが、一方の方法に別の方法の長所または短所があるかどうか、または完全に優れた方法があるかどうか疑問に思っていました.

4

3 に答える 3

14

私はあなたがそこに良い考えを持っていると思います(thenに変えStreamてください)。ただし、IEnumerableのコードはより簡潔にすることができます。Enumerable<string>IObservable<string>

IEnumerable<string> ReadLines(Stream stream)
{
    using (StreamReader reader = new StreamReader(stream))
    {
        while (!reader.EndOfStream)
            yield return reader.ReadLine();
    }
}

そしてIObservableの場合:

IObservable<string> ObserveLines(Stream inputStream)
{
    return ReadLines(inputStream).ToObservable(Scheduler.ThreadPool);
}

これはより短く、より読みやすく、ストリームを適切に破棄します。怠け者でもあります。

ToObservable拡張機能は、OnNextイベントOnCompleted(改行) とイベント (列挙可能の終わり) およびOnError.

于 2012-04-12T07:36:35.283 に答える
2

手元にコードはありませんが、非同期プレ非同期 CTP を実行する方法は次のとおりです。

[スキムリーダーへの注意: あまりスケーリングする必要がない場合は気にする必要はありません]

それ自体が Observable である AsyncTextReader 実装を作成します。ctor は Stream を受け取り、そのストリームに対して BeginRead(256bytes) を実行し、自分自身を継続として渡してから返します。

継続に入ったら、EndRead を呼び出し、返されたバイトをクラスの小さなバッファーに追加します。バッファーに 1 つ以上の行末シーケンスが含まれるまで、これを繰り返します (TextWriter に従って)。これが発生すると、Observable インターフェイスを介して、バッファのビットを文字列として送信し、繰り返します。

終了したら、OnComplete などを通知します (そしてストリームを破棄します)。継続で EndReadByte から例外がスローされた場合は、それをキャッチして OnError インターフェイスに渡します。

呼び出しコードは次のようになります。

IObservable = 新しい AsyncTextReader(ストリーム);

これはうまくスケーリングします。バッファ処理であまりにもばかげたことをしないようにする必要があります。

擬似コード:

public ctor(Stream stream){
    this._stream = stream;
    BeginRead();
    return;
}

private void BeginRead(){
    // kick of async read and return (synchronously)
    this._stream.BeginRead(_buffer,0,256,EndRead,this);
}

private void EndRead(IAsyncResult result){
    try{
        // bytesRead will be *up to* 256
        var bytesRead = this._stream.EndRead(result);
        if(bytesRead < 1){
            OnCompleted();
            return;
        }
        // do work with _buffer, _listOfBuffers
        // to get lines out etc...
        OnNext(aLineIFound); // times n
        BeginRead(); // go round again
    }catch(Exception err){
        OnException(err);
    }
}

わかりました、これは APM であり、母親だけが愛することができるものです。私は代替案を切望しています。

ps: 読者がストリームを閉じるべきかどうかは興味深い質問です。それはそれを作成しなかったので、私はノーと言います。

于 2012-04-13T15:29:00.600 に答える
1

async/await がサポートされている場合、次のことが最善の策となる可能性が最も高くなります。

IObservable<string> ObserveStringStream(Stream inputStream)
{
    return Observable.Using(() => new StreamReader(inputStream), 
        sr => Observable.Create<string>(async (obs, ct) =>
        {
            while (true)
            {
                ct.ThrowIfCancellationRequested();
                var line = await sr.ReadLineAsync().ConfigureAwait(false);
                if (line == null)
                    break;
                obs.OnNext(line);
            }
            obs.OnCompleted();
    }));
}
于 2015-11-16T14:02:06.400 に答える