23

大量のデータを処理する必要があるコンソール アプリケーションを構築しています。

基本的に、アプリケーションは DB から参照を取得します。参照ごとに、ファイルの内容を解析し、いくつかの変更を加えます。ファイルは HTML ファイルであり、プロセスは RegEx 置換 (参照を見つけてリンクに変換する) で重い作業を行っています。結果はファイル システムに保存され、外部システムに送信されます。

プロセスを順次再開すると、次のようになります。

var refs = GetReferencesFromDB(); // ~5000 Datarow returned
foreach(var ref in refs)
{
    var filePath = GetFilePath(ref); // This method looks up in a previously loaded file list
    var html = File.ReadAllText(filePath); // Read html locally, or from a network drive
    var convertedHtml = ParseHtml(html);
    File.WriteAllText(destinationFilePath); // Copy the result locally, or a network drive
    SendToWs(ref, convertedHtml);
}

私のプログラムは正しく動作していますが、非常に遅いです。そのため、プロセスを並列化したいのです。

ここまでで、 AsParallel を追加して単純な並列化を作成しました。

var refs = GetReferencesFromDB().AsParallel(); 
refs.ForAll(ref=>
{
    var filePath = GetFilePath(ref); 
    var html = File.ReadAllText(filePath); 
    var convertedHtml = ParseHtml(html);
    File.WriteAllText(destinationFilePath); 
    SendToWs(ref, convertedHtml);
});

この単純な変更により、プロセスの所要時間が短縮されます (25% 短縮)。ただし、並列化について私が理解していることは、I/O が魔法のように 2 倍になるわけではないため、I/O に依存するリソースを並列化する場合、あまりメリットがない (または、さらに悪いことにメリットが少ない) ということです。

そのため、プロセス全体を並列化するのではなく、依存関係のあるチェーン キュー タスクを作成するようにアプローチを変更する必要があると思います。

IE、次のようなフローを作成する必要があります。

キュー読み取りファイル。終了したら、Queue ParseHtml. 完了すると、Queue は WS に送信し、ローカルに書き込みます。終了したら、結果をログに記録します。

しかし、私はそのような考えを実現する方法を知りません。

コンシューマー/プロデューサー キューのセットで終了する気がしますが、正しいサンプルが見つかりませんでした。

さらに、メリットがあるかどうかもわかりません。

アドバイスをありがとう

[編集]実際、私は C# 4.5 を使用するのに最適な候補です... もしそれが rtm だったら:)

[編集 2]正しく並列化されていないと思わせるもう 1 つのことは、リソース モニターで、CPU、ネットワーク I/O、およびディスク I/O のグラフが安定していないことです。1 つが高い場合、他は低から中程度

4

5 に答える 5

17

どのコードでも非同期 I/O API を利用していません。あなたがしていることはすべて CPU バウンドであり、すべての I/O 操作は CPU リソースのブロックを浪費します。AsParallelはコンピューティング バウンド タスク用です。非同期 I/O を利用したい場合は、現在 v4.0 以下で非同期プログラミング モデル (APM) ベースの API を活用する必要があります。これは、使用している I/O ベースのクラスのメソッドを探し、BeginXXX/EndXXX利用可能な場合はそれらを利用することによって行われます。

初心者向けのこの投稿をお読みください: TPL TaskFactory.FromAsync とブロッキング メソッドを使用したタスク

AsParallel次に、とにかくこの場合は使いたくないでしょう。AsParallelストリーミングを有効にすると、アイテムごとに新しいタスクがすぐにスケジュールされますが、ここでは必要ありません。を使用して作業を分割することで、はるかに優れたサービスを提供できますParallel::ForEach

この知識を使用して、特定のケースで最大の同時実行性を達成する方法を見てみましょう。

var refs = GetReferencesFromDB();

// Using Parallel::ForEach here will partition and process your data on separate worker threads
Parallel.ForEach(
    refs,
    ref =>
{ 
    string filePath = GetFilePath(ref);

    byte[] fileDataBuffer = new byte[1048576];

    // Need to use FileStream API directly so we can enable async I/O
    FileStream sourceFileStream = new FileStream(
                                      filePath, 
                                      FileMode.Open,
                                      FileAccess.Read,
                                      FileShare.Read,
                                      8192,
                                      true);

    // Use FromAsync to read the data from the file
    Task<int> readSourceFileStreamTask = Task.Factory.FromAsync(
                                             sourceFileStream.BeginRead
                                             sourceFileStream.EndRead
                                             fileDataBuffer,
                                             fileDataBuffer.Length,
                                             null);

    // Add a continuation that will fire when the async read is completed
    readSourceFileStreamTask.ContinueWith(readSourceFileStreamAntecedent =>
    {
        int soureFileStreamBytesRead;

        try
        {
            // Determine exactly how many bytes were read 
            // NOTE: this will propagate any potential exception that may have occurred in EndRead
            sourceFileStreamBytesRead = readSourceFileStreamAntecedent.Result;
        }
        finally
        {
            // Always clean up the source stream
            sourceFileStream.Close();
            sourceFileStream = null;
        }

        // This is here to make sure you don't end up trying to read files larger than this sample code can handle
        if(sourceFileStreamBytesRead == fileDataBuffer.Length)
        {
            throw new NotSupportedException("You need to implement reading files larger than 1MB. :P");
        }

        // Convert the file data to a string
        string html = Encoding.UTF8.GetString(fileDataBuffer, 0, sourceFileStreamBytesRead);

        // Parse the HTML
        string convertedHtml = ParseHtml(html);

        // This is here to make sure you don't end up trying to write files larger than this sample code can handle
        if(Encoding.UTF8.GetByteCount > fileDataBuffer.Length)
        {
            throw new NotSupportedException("You need to implement writing files larger than 1MB. :P");
        }

        // Convert the file data back to bytes for writing
        Encoding.UTF8.GetBytes(convertedHtml, 0, convertedHtml.Length, fileDataBuffer, 0);

        // Need to use FileStream API directly so we can enable async I/O
        FileStream destinationFileStream = new FileStream(
                                               destinationFilePath,
                                               FileMode.OpenOrCreate,
                                               FileAccess.Write,
                                               FileShare.None,
                                               8192,
                                               true);

        // Use FromAsync to read the data from the file
        Task destinationFileStreamWriteTask = Task.Factory.FromAsync(
                                                  destinationFileStream.BeginWrite,
                                                  destinationFileStream.EndWrite,
                                                  fileDataBuffer,
                                                  0,
                                                  fileDataBuffer.Length,
                                                  null);

        // Add a continuation that will fire when the async write is completed
        destinationFileStreamWriteTask.ContinueWith(destinationFileStreamWriteAntecedent =>
        {
            try
            {
                // NOTE: we call wait here to observe any potential exceptions that might have occurred in EndWrite
                destinationFileStreamWriteAntecedent.Wait();
            }
            finally
            {
                // Always close the destination file stream
                destinationFileStream.Close();
                destinationFileStream = null;
            }
        },
        TaskContinuationOptions.AttachedToParent);

        // Send to external system **concurrent** to writing to destination file system above
        SendToWs(ref, convertedHtml);
    },
    TaskContinuationOptions.AttachedToParent);
});

さて、ここにいくつかの注意事項があります:

  1. これはサンプル コードなので、ファイルの読み取り/書き込みに 1MB のバッファーを使用しています。これは、HTML ファイルでは過剰であり、システム リソースを浪費します。最大のニーズに合わせてそれを下げるか、チェーンされた読み取り/書き込みを StringBuilder に実装することができます。:P
  2. 私が持っている読み取り/書き込みタスクの続きであることに気付くでしょうTaskContinuationOptions.AttachedToParentParallel::ForEachこれは、基になるすべての非同期呼び出しが完了するまで、作業を開始するワーカー スレッドが完了しないようにするため、非常に重要です。これがなければ、5000 項目すべての作業を同時に開始することになり、数千のスケジュールされたタスクで TPL サブシステムが汚染され、適切にスケーリングされなくなります。
  3. ここで、ファイル共有へのファイルの書き込みと同時に SendToWs を呼び出します。SendToWs の実装の根底にあるものはわかりませんが、これも非同期にする良い候補のように思えます。現時点では、純粋な計算作業であると想定されているため、実行中に CPU スレッドが消費されます。演習として、私が示したものを活用してスループットを向上させる最善の方法を見つけてください。
  4. これはすべて型付けされた自由形式であり、私の脳はここで唯一のコンパイラであり、構文が適切であることを確認するために使用したのは SO の構文強調表示だけです。したがって、構文エラーがあればご容赦ください。頭も尻尾もわからないほどひどく失敗した場合はお知らせください。フォローアップします。
于 2011-12-15T19:58:31.823 に答える
5

幸いなことに、作成したロジックは、生産者と消費者のパイプラインに入るステップに簡単に分離できます。

  • ステップ 1: ファイルの読み取り
  • ステップ 2: ファイルを解析する
  • ステップ 3: ファイルの書き込み
  • ステップ 4: SendToWs

.NET 4.0 を使用している場合は、BlockingCollectionデータ構造を各ステップのプロデューサー/コンシューマー キューのバックボーンとして使用できます。メイン スレッドは、各作業項目をステップ 1 のキューに入れ、そこでピックアップして処理し、ステップ 2 のキューに転送します。

Async CTPに移行する場合は、新しいTPL データフロー構造を利用することもできます。BufferBlock<T>とりわけ、 newおよびキーワードと同様に動作し、BlockingCollection適切に統合されるデータ構造があります。asyncawait

アルゴリズムは IO バウンドであるため、生産者と消費者の戦略では、求めているパフォーマンスの向上が得られない可能性がありますが、少なくとも、IO スループットを向上させることができれば、適切に拡張できる非常に洗練されたソリューションが得られます。ステップ 1 と 3 がボトルネックになり、パイプラインのバランスが取れなくなるのではないかと心配していますが、試してみる価値はあります。

于 2011-12-14T14:19:12.230 に答える
3

単なる提案ですが、消費者/生産者のパターンを調べましたか?特定の数のスレッドがディスク上のファイルを読み取り、コンテンツをキューに送ります。次に、コンシューマーと呼ばれる別のスレッドセットが、キューがいっぱいになるとキューを「消費」します。http://zone.ni.com/devzone/cda/tut/p/id/3023

于 2011-12-14T14:12:39.890 に答える
2

この種のシナリオでの最善の策は、間違いなく生産者/消費者モデルです。データをプルするための1つのスレッドと、データを処理するための多数のワーカー。I / Oを回避する簡単な方法はないため、計算自体の最適化に集中することもできます。

モデルをスケッチしてみましょう。

// producer thread
var refs = GetReferencesFromDB(); // ~5000 Datarow returned

foreach(var ref in refs)
{
    lock(queue)
    {   
       queue.Enqueue(ref);
       event.Set();
    }

    // if the queue is limited, test if the queue is full and wait.
}

// consumer threads
while(true)
{
    value = null;
    lock(queue)
    {
       if(queue.Count > 0)
       {
           value = queue.Dequeue();
       }
    }        

    if(value != null) 
       // process value
    else        
       event.WaitOne(); // event to signal that an item was placed in the queue.           
}

プロデューサー/コンシューマーの詳細については、C#でのスレッド化のパート4を参照してください:http://www.albahari.com/threading/part4.aspx

于 2011-12-14T14:12:16.740 に答える
0

ファイルのリストを分割し、各ファイルを 1 つのバッチで処理するアプローチは問題ないと思います。私の感覚では、並列度を調整してプレイすると、パフォーマンスが向上する可能性があります。参照:var refs = GetReferencesFromDB().AsParallel().WithDegreeOfParallelism(16);これにより、同時に 16 個のファイルの処理が開始されます。現在、所有しているコアの数に応じて、おそらく 2 つまたは 4 つのファイルを処理しています。これは、IO を使用せずに計算のみを行う場合にのみ効率的です。IO 集中型タスクの場合、調整により、プロセッサーのアイドル時間が大幅に短縮され、パフォーマンスが大幅に向上する可能性があります。

プロデューサー/コンシューマーを使用してタスクを分割して結合する場合は、次のサンプルをご覧ください: Using Parallel Linq Extensions to union two sequences, how can one can yield the best results first?

于 2011-12-14T14:40:01.947 に答える