どのコードでも非同期 I/O API を利用していません。あなたがしていることはすべて CPU バウンドであり、すべての I/O 操作は CPU リソースのブロックを浪費します。AsParallel
はコンピューティング バウンド タスク用です。非同期 I/O を利用したい場合は、現在 v4.0 以下で非同期プログラミング モデル (APM) ベースの API を活用する必要があります。これは、使用している I/O ベースのクラスのメソッドを探し、BeginXXX/EndXXX
利用可能な場合はそれらを利用することによって行われます。
初心者向けのこの投稿をお読みください: TPL TaskFactory.FromAsync とブロッキング メソッドを使用したタスク
AsParallel
次に、とにかくこの場合は使いたくないでしょう。AsParallel
ストリーミングを有効にすると、アイテムごとに新しいタスクがすぐにスケジュールされますが、ここでは必要ありません。を使用して作業を分割することで、はるかに優れたサービスを提供できますParallel::ForEach
。
この知識を使用して、特定のケースで最大の同時実行性を達成する方法を見てみましょう。
var refs = GetReferencesFromDB();
// Using Parallel::ForEach here will partition and process your data on separate worker threads
Parallel.ForEach(
refs,
ref =>
{
string filePath = GetFilePath(ref);
byte[] fileDataBuffer = new byte[1048576];
// Need to use FileStream API directly so we can enable async I/O
FileStream sourceFileStream = new FileStream(
filePath,
FileMode.Open,
FileAccess.Read,
FileShare.Read,
8192,
true);
// Use FromAsync to read the data from the file
Task<int> readSourceFileStreamTask = Task.Factory.FromAsync(
sourceFileStream.BeginRead
sourceFileStream.EndRead
fileDataBuffer,
fileDataBuffer.Length,
null);
// Add a continuation that will fire when the async read is completed
readSourceFileStreamTask.ContinueWith(readSourceFileStreamAntecedent =>
{
int soureFileStreamBytesRead;
try
{
// Determine exactly how many bytes were read
// NOTE: this will propagate any potential exception that may have occurred in EndRead
sourceFileStreamBytesRead = readSourceFileStreamAntecedent.Result;
}
finally
{
// Always clean up the source stream
sourceFileStream.Close();
sourceFileStream = null;
}
// This is here to make sure you don't end up trying to read files larger than this sample code can handle
if(sourceFileStreamBytesRead == fileDataBuffer.Length)
{
throw new NotSupportedException("You need to implement reading files larger than 1MB. :P");
}
// Convert the file data to a string
string html = Encoding.UTF8.GetString(fileDataBuffer, 0, sourceFileStreamBytesRead);
// Parse the HTML
string convertedHtml = ParseHtml(html);
// This is here to make sure you don't end up trying to write files larger than this sample code can handle
if(Encoding.UTF8.GetByteCount > fileDataBuffer.Length)
{
throw new NotSupportedException("You need to implement writing files larger than 1MB. :P");
}
// Convert the file data back to bytes for writing
Encoding.UTF8.GetBytes(convertedHtml, 0, convertedHtml.Length, fileDataBuffer, 0);
// Need to use FileStream API directly so we can enable async I/O
FileStream destinationFileStream = new FileStream(
destinationFilePath,
FileMode.OpenOrCreate,
FileAccess.Write,
FileShare.None,
8192,
true);
// Use FromAsync to read the data from the file
Task destinationFileStreamWriteTask = Task.Factory.FromAsync(
destinationFileStream.BeginWrite,
destinationFileStream.EndWrite,
fileDataBuffer,
0,
fileDataBuffer.Length,
null);
// Add a continuation that will fire when the async write is completed
destinationFileStreamWriteTask.ContinueWith(destinationFileStreamWriteAntecedent =>
{
try
{
// NOTE: we call wait here to observe any potential exceptions that might have occurred in EndWrite
destinationFileStreamWriteAntecedent.Wait();
}
finally
{
// Always close the destination file stream
destinationFileStream.Close();
destinationFileStream = null;
}
},
TaskContinuationOptions.AttachedToParent);
// Send to external system **concurrent** to writing to destination file system above
SendToWs(ref, convertedHtml);
},
TaskContinuationOptions.AttachedToParent);
});
さて、ここにいくつかの注意事項があります:
- これはサンプル コードなので、ファイルの読み取り/書き込みに 1MB のバッファーを使用しています。これは、HTML ファイルでは過剰であり、システム リソースを浪費します。最大のニーズに合わせてそれを下げるか、チェーンされた読み取り/書き込みを StringBuilder に実装することができます。:P
- 私が持っている読み取り/書き込みタスクの続きであることに気付くでしょう
TaskContinuationOptions.AttachedToParent
。Parallel::ForEach
これは、基になるすべての非同期呼び出しが完了するまで、作業を開始するワーカー スレッドが完了しないようにするため、非常に重要です。これがなければ、5000 項目すべての作業を同時に開始することになり、数千のスケジュールされたタスクで TPL サブシステムが汚染され、適切にスケーリングされなくなります。
- ここで、ファイル共有へのファイルの書き込みと同時に SendToWs を呼び出します。SendToWs の実装の根底にあるものはわかりませんが、これも非同期にする良い候補のように思えます。現時点では、純粋な計算作業であると想定されているため、実行中に CPU スレッドが消費されます。演習として、私が示したものを活用してスループットを向上させる最善の方法を見つけてください。
- これはすべて型付けされた自由形式であり、私の脳はここで唯一のコンパイラであり、構文が適切であることを確認するために使用したのは SO の構文強調表示だけです。したがって、構文エラーがあればご容赦ください。頭も尻尾もわからないほどひどく失敗した場合はお知らせください。フォローアップします。