0

サーバーからデータをリッスンして書き込むtcpリスナーがあります。を使用しBlockingCollectionてデータを保存しました。ここでは、ファイルがいつ終了するかわかりません。したがって、私のファイルストリームは常に開いています。

私のコードの一部は次のとおりです。

private static BlockingCollection<string> Buffer = new   BlockingCollection<string>();

Process()

{
 var consumer = Task.Factory.StartNew(() =>WriteData());
 while()

 {
  string request = await reader.ReadLineAsync();
  Buffer.Add(request);
 }
} 

WriteData()
{
  FileStream fStream = new FileStream(filename,FileMode.Append,FileAccess.Write,FileShare.Write, 16392);

 foreach(var val in Buffer.GetConsumingEnumerable(token))
 {

 fStream.Write(Encoding.UTF8.GetBytes(val), 0, val.Length);
                            fStream.Flush();
 }

}

問題は、ループ内でファイルストリームを破棄できないことです。そうしないと、各行のファイルストリームを作成する必要があり、ループが終了しない可能性があります。

4

1 に答える 1

0

DataFlow ActionBlockを使用した場合、これは .NET 4.5 ではるかに簡単になります。ActionBlock は、受信メッセージを受け入れてバッファリングし、1 つ以上のタスクを使用して非同期的に処理します。

次のように書くことができます。

public static async Task ProcessFile(string sourceFileName,string targetFileName)
{
    //Pass the target stream as part of the message to avoid globals
    var block = new ActionBlock<Tuple<string, FileStream>>(async tuple =>
    {
        var line = tuple.Item1;
        var stream = tuple.Item2;
        await stream.WriteAsync(Encoding.UTF8.GetBytes(line), 0, line.Length);
    });


    //Post lines to block
    using (var targetStream = new FileStream(targetFileName, FileMode.Append, 
                                   FileAccess.Write, FileShare.Write, 16392))
    {
        using (var sourceStream = File.OpenRead(sourceFileName))
        {
            await PostLines(sourceStream, targetStream, block);
        }
        //Tell the block we are done
        block.Complete();
        //And wait fo it to finish
        await block.Completion;
    }

}

private static async Task PostLines(FileStream sourceStream, FileStream targetStream, 
                                    ActionBlock<Tuple<string, FileStream>> block)
{
    using (var reader = new StreamReader(sourceStream))
    {
        while (true)
        {
            var line = await reader.ReadLineAsync();
            if (line == null)
                break;
            var tuple = Tuple.Create(line, targetStream);
            block.Post(tuple);
        }
    }
}

ほとんどのコードは、各行の読み取りとブロックへの投稿を処理します。デフォルトでは、ActionBlock は一度に 1 つのメッセージを処理するために 1 つのタスクのみを使用しますが、このシナリオではこれで問題ありません。データを並行して処理する必要がある場合は、さらに多くのタスクを使用できます。

すべての行が読み取られたら、 への呼び出しでブロックに通知し、Completeで処理が完了するのを待ちawait block.Completionます。

ブロックのCompletionタスクが完了したら、ターゲット ストリームを閉じることができます。

DataFlow ライブラリの優れた点は、複数のブロックをリンクして、処理ステップのパイプラインを作成できることです。ActionBlock は通常、このようなチェーンの最後のステップです。ライブラリは、あるブロックから次のブロックにデータを渡し、完了をチェーンに伝達するように注意します。

たとえば、1 つのステップでログからファイルを読み取り、2 つ目のステップでそれらを正規表現で解析して特定のパターン (エラー メッセージなど) を見つけて渡し、3 つ目のステップでエラー メッセージを受信して​​別のファイルに書き込むことができます。各ステップは異なるスレッドで実行され、中間メッセージは各ステップでバッファリングされます。

于 2015-04-23T15:47:28.207 に答える