1日あたり最大30GBのGZippedログファイルがあります。各ファイルは100.000行を保持し、圧縮すると6〜8MBになります。解析ロジックが削除された簡略化されたコードは、Parallel.ForEachループを利用します。
処理されるラインの最大数は、2 NUMAノード、32論理CPUボックス(Intel Xeon E7-2820 @ 2 GHz)でMaxDegreeOfParallelismの8でピークになります。
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.IO;
using System.IO.Compression;
using System.Threading.Tasks;
namespace ParallelLineCount
{
public class ScriptMain
{
static void Main(String[] args)
{
int maxMaxDOP = (args.Length > 0) ? Convert.ToInt16(args[0]) : 2;
string fileLocation = (args.Length > 1) ? args[1] : "C:\\Temp\\SomeFiles" ;
string filePattern = (args.Length > 1) ? args[2] : "*2012-10-30.*.gz";
string fileNamePrefix = (args.Length > 1) ? args[3] : "LineCounts";
Console.WriteLine("Start: {0}", DateTime.UtcNow.ToString("yyyy-MM-ddTHH:mm:ss.fffffffZ"));
Console.WriteLine("Processing file(s): {0}", filePattern);
Console.WriteLine("Max MaxDOP to be used: {0}", maxMaxDOP.ToString());
Console.WriteLine("");
Console.WriteLine("MaxDOP,FilesProcessed,ProcessingTime[ms],BytesProcessed,LinesRead,SomeBookLines,LinesPer[ms],BytesPer[ms]");
for (int maxDOP = 1; maxDOP <= maxMaxDOP; maxDOP++)
{
// Construct ConcurrentStacks for resulting strings and counters
ConcurrentStack<Int64> TotalLines = new ConcurrentStack<Int64>();
ConcurrentStack<Int64> TotalSomeBookLines = new ConcurrentStack<Int64>();
ConcurrentStack<Int64> TotalLength = new ConcurrentStack<Int64>();
ConcurrentStack<int> TotalFiles = new ConcurrentStack<int>();
DateTime FullStartTime = DateTime.Now;
string[] files = System.IO.Directory.GetFiles(fileLocation, filePattern);
var options = new ParallelOptions() { MaxDegreeOfParallelism = maxDOP };
// Method signature: Parallel.ForEach(IEnumerable<TSource> source, Action<TSource> body)
Parallel.ForEach(files, options, currentFile =>
{
string filename = System.IO.Path.GetFileName(currentFile);
DateTime fileStartTime = DateTime.Now;
using (FileStream inFile = File.Open(fileLocation + "\\" + filename, FileMode.Open))
{
Int64 lines = 0, someBookLines = 0, length = 0;
String line = "";
using (var reader = new StreamReader(new GZipStream(inFile, CompressionMode.Decompress)))
{
while (!reader.EndOfStream)
{
line = reader.ReadLine();
lines++; // total lines
length += line.Length; // total line length
if (line.Contains("book")) someBookLines++; // some special lines that need to be parsed later
}
TotalLines.Push(lines); TotalSomeBookLines.Push(someBookLines); TotalLength.Push(length);
TotalFiles.Push(1); // silly way to count processed files :)
}
}
}
);
TimeSpan runningTime = DateTime.Now - FullStartTime;
// Console.WriteLine("MaxDOP,FilesProcessed,ProcessingTime[ms],BytesProcessed,LinesRead,SomeBookLines,LinesPer[ms],BytesPer[ms]");
Console.WriteLine("{0},{1},{2},{3},{4},{5},{6},{7}",
maxDOP.ToString(),
TotalFiles.Sum().ToString(),
Convert.ToInt32(runningTime.TotalMilliseconds).ToString(),
TotalLength.Sum().ToString(),
TotalLines.Sum(),
TotalSomeBookLines.Sum().ToString(),
Convert.ToInt64(TotalLines.Sum() / runningTime.TotalMilliseconds).ToString(),
Convert.ToInt64(TotalLength.Sum() / runningTime.TotalMilliseconds).ToString());
}
Console.WriteLine();
Console.WriteLine("Finish: " + DateTime.UtcNow.ToString("yyyy-MM-ddTHH:mm:ss.fffffffZ"));
}
}
}
これが結果の要約であり、MaxDegreeOfParallelism=8に明確なピークがあります。
CPU負荷(ここに集約して示されています。DOPが20〜30の範囲にある場合でも、ほとんどの負荷は単一のNUMAノードにありました):
CPU負荷が95%を超えるようにする唯一の方法は、ファイルを4つの異なるフォルダーに分割し、同じコマンドを4回実行することでした。各フォルダーは、すべてのファイルのサブセットを対象としています。
誰かがボトルネックを見つけることができますか?