15

1日あたり最大30GBのGZippedログファイルがあります。各ファイルは100.000行を保持し、圧縮すると6〜8MBになります。解析ロジックが削除された簡略化されたコードは、Parallel.ForEachループを利用します。

処理されるラインの最大数は、2 NUMAノード、32論理CPUボックス(Intel Xeon E7-2820 @ 2 GHz)でMaxDegreeOfParallelismの8でピークになります。

using System;

using System.Collections.Concurrent;

using System.Linq;
using System.IO;
using System.IO.Compression;

using System.Threading.Tasks;

namespace ParallelLineCount
{
    public class ScriptMain
    {
        static void Main(String[] args)
        {
            int    maxMaxDOP      = (args.Length > 0) ? Convert.ToInt16(args[0]) : 2;
            string fileLocation   = (args.Length > 1) ? args[1] : "C:\\Temp\\SomeFiles" ;
            string filePattern    = (args.Length > 1) ? args[2] : "*2012-10-30.*.gz";
            string fileNamePrefix = (args.Length > 1) ? args[3] : "LineCounts";

            Console.WriteLine("Start:                 {0}", DateTime.UtcNow.ToString("yyyy-MM-ddTHH:mm:ss.fffffffZ"));
            Console.WriteLine("Processing file(s):    {0}", filePattern);
            Console.WriteLine("Max MaxDOP to be used: {0}", maxMaxDOP.ToString());
            Console.WriteLine("");

            Console.WriteLine("MaxDOP,FilesProcessed,ProcessingTime[ms],BytesProcessed,LinesRead,SomeBookLines,LinesPer[ms],BytesPer[ms]");

            for (int maxDOP = 1; maxDOP <= maxMaxDOP; maxDOP++)
            {

                // Construct ConcurrentStacks for resulting strings and counters
                ConcurrentStack<Int64> TotalLines = new ConcurrentStack<Int64>();
                ConcurrentStack<Int64> TotalSomeBookLines = new ConcurrentStack<Int64>();
                ConcurrentStack<Int64> TotalLength = new ConcurrentStack<Int64>();
                ConcurrentStack<int>   TotalFiles = new ConcurrentStack<int>();

                DateTime FullStartTime = DateTime.Now;

                string[] files = System.IO.Directory.GetFiles(fileLocation, filePattern);

                var options = new ParallelOptions() { MaxDegreeOfParallelism = maxDOP };

                //  Method signature: Parallel.ForEach(IEnumerable<TSource> source, Action<TSource> body)
                Parallel.ForEach(files, options, currentFile =>
                    {
                        string filename = System.IO.Path.GetFileName(currentFile);
                        DateTime fileStartTime = DateTime.Now;

                        using (FileStream inFile = File.Open(fileLocation + "\\" + filename, FileMode.Open))
                        {
                            Int64 lines = 0, someBookLines = 0, length = 0;
                            String line = "";

                            using (var reader = new StreamReader(new GZipStream(inFile, CompressionMode.Decompress)))
                            {
                                while (!reader.EndOfStream)
                                {
                                    line = reader.ReadLine();
                                    lines++; // total lines
                                    length += line.Length;  // total line length

                                    if (line.Contains("book")) someBookLines++; // some special lines that need to be parsed later
                                }

                                TotalLines.Push(lines); TotalSomeBookLines.Push(someBookLines); TotalLength.Push(length);
                                TotalFiles.Push(1); // silly way to count processed files :)
                            }
                        }
                    }
                );

                TimeSpan runningTime = DateTime.Now - FullStartTime;

                // Console.WriteLine("MaxDOP,FilesProcessed,ProcessingTime[ms],BytesProcessed,LinesRead,SomeBookLines,LinesPer[ms],BytesPer[ms]");
                Console.WriteLine("{0},{1},{2},{3},{4},{5},{6},{7}",
                    maxDOP.ToString(),
                    TotalFiles.Sum().ToString(),
                    Convert.ToInt32(runningTime.TotalMilliseconds).ToString(),
                    TotalLength.Sum().ToString(),
                    TotalLines.Sum(),
                    TotalSomeBookLines.Sum().ToString(),
                    Convert.ToInt64(TotalLines.Sum() / runningTime.TotalMilliseconds).ToString(),
                    Convert.ToInt64(TotalLength.Sum() / runningTime.TotalMilliseconds).ToString());

            }
            Console.WriteLine();
            Console.WriteLine("Finish:                " + DateTime.UtcNow.ToString("yyyy-MM-ddTHH:mm:ss.fffffffZ"));
        }
    }
}

これが結果の要約であり、MaxDegreeOfParallelism=8に明確なピークがあります。

ここに画像の説明を入力してください

CPU負荷(ここに集約して示されています。DOPが20〜30の範囲にある場合でも、ほとんどの負荷は単一のNUMAノードにありました):

ここに画像の説明を入力してください

CPU負荷が95%を超えるようにする唯一の方法は、ファイルを4つの異なるフォルダーに分割し、同じコマンドを4回実行することでした。各フォルダーは、すべてのファイルのサブセットを対象としています。

誰かがボトルネックを見つけることができますか?

4

4 に答える 4

9

1 つの問題は、既定のFileStreamコンストラクターで使用されるバッファー サイズが小さいことである可能性があります。より大きな入力バッファを使用することをお勧めします。そのような:

using (FileStream infile = new FileStream(
    name, FileMode.Open, FileAccess.Read, FileShare.None, 65536))

デフォルトのバッファ サイズは 4 キロバイトで、スレッドはバッファを埋めるために I/O サブシステムに多くの呼び出しを行います。64K のバッファーは、これらの呼び出しの頻度が大幅に低下することを意味します。

32K から 256K の間のバッファー サイズが最高のパフォーマンスを提供することがわかりました。しばらく前に詳細なテストを行ったとき、64K が「スイート スポット」でした。バッファ サイズが 256K を超えると、実際にはパフォーマンスが低下し始めます。

また、これがパフォーマンスに大きな影響を与える可能性は低いですが、これらのConcurrentStackインスタンスを 64 ビット整数に置き換えて、Interlocked.AddまたはInterlocked.Increment更新する必要があります。コードが簡素化され、コレクションを管理する必要がなくなります。

アップデート:

問題の説明を読み直したところ、次のステートメントに感銘を受けました。

CPU 負荷が 95% を超える唯一の方法は、ファイルを 4 つの異なるフォルダーに分割し、同じコマンドを 4 回実行することでした。各コマンドは、すべてのファイルのサブセットを対象としています。

これは、ファイルを開く際のボトルネックを示しています。OS がディレクトリに対して相互排他ロックを使用しているかのように。また、すべてのデータがキャッシュにあり、物理 I/O が必要ない場合でも、プロセスはこのロックを待機する必要があります。ファイル システムがディスクに書き込んでいる可能性もあります。ファイルが開かれるたびに、ファイルの最終アクセス時刻を更新する必要があることに注意してください。

I/O が本当にボトルネックである場合は、ファイルをロードし、それらを類似のデータ構造に詰め込むだけの単一のスレッドをBlockingCollection用意して、処理中のスレッドがロックのために互いに競合する必要がないようにすることを検討できます。ディレクトリ。アプリケーションは、1 つのプロデューサーと N 人のコンシューマーを持つプロデューサー/コンシューマー アプリケーションになります。

于 2012-11-01T13:30:27.877 に答える
2

ディスク読み取りの並列化が役に立っているとは思いません。実際、これは同時に複数のストレージ領域からの読み取りで競合を引き起こし、パフォーマンスに深刻な影響を与える可能性があります.

プログラムを再構成して、生のファイル データをバイト [] のメモリ ストリームにシングル スレッドで読み取るようにします。次に、各ストリームまたはバッファーで Parallel.ForEach() を実行して、行を解凍してカウントします。

最初の IO 読み取りヒットを前もって取得しますが、OS/ハードウェアに、できればほとんどシーケンシャルな読み取りを最適化させてから、解凍してメモリ内で解析します。

decomprless、Encoding.UTF8.ToString()、String.Split() などの操作は大量のメモリを使用するため、不要になった古いバッファへの参照をクリーンアップまたは破棄することに注意してください。

この方法で機械に深刻な廃棄物を発生させることができないとしたら、私は驚かれることでしょう。

お役に立てれば。

于 2013-01-03T08:24:35.673 に答える
2

この理由は通常、スレッドの同期が多すぎるためです。

コードで同期を探していると、コレクションで頻繁に同期が行われていることがわかります。あなたのスレッドは行を個別にプッシュしています。これは、各行で、せいぜいインターロック操作が発生し、最悪の場合、カーネル モードのロック待機が発生することを意味します。すべてのスレッドが現在の行をコレクションに取得するために競合するため、インターロックされた操作は激しく競合します。それらはすべて同じメモリ位置を更新しようとします。これにより、キャッシュ ラインの ping が発生します。

これを変更して、行をより大きなチャンクにプッシュします。100 行以上のラインアレイをプッシュします。多ければ多いほどよい。

つまり、最初にスレッド ローカル コレクションで結果を収集し、グローバル結果にマージされることはめったにありません。

手動でのデータのプッシュを完全に取り除きたいと思うかもしれません。これが PLINQ の目的です: データの同時ストリーミング。PLINQ は、すべての同時コレクション操作を適切な方法で抽象化します。

于 2012-11-01T12:58:11.187 に答える