DataFlow ActionBlockを使用した場合、これは .NET 4.5 ではるかに簡単になります。ActionBlock は、受信メッセージを受け入れてバッファリングし、1 つ以上のタスクを使用して非同期的に処理します。
次のように書くことができます。
public static async Task ProcessFile(string sourceFileName,string targetFileName)
{
//Pass the target stream as part of the message to avoid globals
var block = new ActionBlock<Tuple<string, FileStream>>(async tuple =>
{
var line = tuple.Item1;
var stream = tuple.Item2;
await stream.WriteAsync(Encoding.UTF8.GetBytes(line), 0, line.Length);
});
//Post lines to block
using (var targetStream = new FileStream(targetFileName, FileMode.Append,
FileAccess.Write, FileShare.Write, 16392))
{
using (var sourceStream = File.OpenRead(sourceFileName))
{
await PostLines(sourceStream, targetStream, block);
}
//Tell the block we are done
block.Complete();
//And wait fo it to finish
await block.Completion;
}
}
private static async Task PostLines(FileStream sourceStream, FileStream targetStream,
ActionBlock<Tuple<string, FileStream>> block)
{
using (var reader = new StreamReader(sourceStream))
{
while (true)
{
var line = await reader.ReadLineAsync();
if (line == null)
break;
var tuple = Tuple.Create(line, targetStream);
block.Post(tuple);
}
}
}
ほとんどのコードは、各行の読み取りとブロックへの投稿を処理します。デフォルトでは、ActionBlock は一度に 1 つのメッセージを処理するために 1 つのタスクのみを使用しますが、このシナリオではこれで問題ありません。データを並行して処理する必要がある場合は、さらに多くのタスクを使用できます。
すべての行が読み取られたら、 への呼び出しでブロックに通知し、Complete
で処理が完了するのを待ちawait block.Completion
ます。
ブロックのCompletion
タスクが完了したら、ターゲット ストリームを閉じることができます。
DataFlow ライブラリの優れた点は、複数のブロックをリンクして、処理ステップのパイプラインを作成できることです。ActionBlock は通常、このようなチェーンの最後のステップです。ライブラリは、あるブロックから次のブロックにデータを渡し、完了をチェーンに伝達するように注意します。
たとえば、1 つのステップでログからファイルを読み取り、2 つ目のステップでそれらを正規表現で解析して特定のパターン (エラー メッセージなど) を見つけて渡し、3 つ目のステップでエラー メッセージを受信して別のファイルに書き込むことができます。各ステップは異なるスレッドで実行され、中間メッセージは各ステップでバッファリングされます。