4

以下のクラスを遅延してファイルを返すように変換しようとしています。

public class ObservableFile2 : IObservable<string>
{
    private readonly IObservable<string> subject;

    public ObservableFile2(string fileName)
    {
        subject = Observable.Using<string, StreamReader>
            (
                () => new StreamReader(new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read)),
                streamReader => ObserveLines(streamReader)
            );
    }

    private IObservable<string> ObserveLines(StreamReader streamReader)
    {
        return ReadLines(streamReader).ToObservable();
    }

    private IEnumerable<string> ReadLines(StreamReader streamReader)
    {
        while (!streamReader.EndOfStream)
        {
            yield return streamReader.ReadLine();
        }
    }        

    public IDisposable Subscribe(IObserver<string> observer)
    {
        return subject.Subscribe(observer);
    }
}

私は今、これを変換して使用しようとしています

StreamReader.ReadLineAsync() 

またはさらに良いのは、データをチャンクすることです。

await SourceStream.ReadAsync(buffer, 0, chunkSize). 

タスクのラップとアンラップの方法を把握していないようです

支援を歓迎します。

ありがとう

4

2 に答える 2

8

私はRxマスターではないので、私の答えよりも良い方法があるかもしれません.

async-enabledを使用すると、これが可能になるはずCreateです。

public static class ObservableFile2
{
  public static IObservable<string> Create(string fileName)
  {
    return Observable.Create<string>(async (subject, token) =>
    {
      try
      {
        using (var streamReader = new StreamReader(new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read))
        {
          while (true)
          {
            token.ThrowIfCancellationRequested();
            var line = await streamReader.ReadLineAsync();
            if (line == null)
            {
              subject.OnCompleted();
              return;
            }
            subject.OnNext(line);
          }
        }
      }
      catch (Exception ex)
      {
        subject.OnError(ex);
      }
    });
  }
}
于 2012-10-31T15:44:32.680 に答える
0

I don't know if the async-features are needed for your solution as you did not mention it - all I can see is that you want to consume the file "lazily" - my guess is that you want to get the lines one at a time and if so this one should do the trick:

public static IEnumerable<string> EnumerateLines(string fileName)
{
    using (
        var streamReader =
            new StreamReader(new FileStream(fileName,
                                            FileMode.Open,
                                            FileAccess.Read,
                                            FileShare.Read)))
    {
        while (true)
        {
            if (streamReader.EndOfStream)
                yield break;

            Console.WriteLine("read another line...");
            yield return streamReader.ReadLine();
        }
    }
}

please note that this depends just on the implementation-details of StreamReader.ReadLine. You could try to make even this non-strict evalutated by using Lazy - but then you get into trouble with disopsing the file-handles as you won't know when the values are really getting consumed (even Haskell has this problem :) ) - my advice: don't try to be to lazy with files ... this can get you easy into trouble

于 2012-11-06T09:20:43.227 に答える