短いテスト プロジェクトを作成し、いくつかの異なるアプローチを試しました。私の目標は、シーケンシャル コードだけを使用して、27 列 (id,A,B,C,...,Z) と NumOfRows 約 300,000 の DataTable をできるだけ早く構築することでした。
(各行には ID が入力され、残りの列にはランダムな 5 文字の単語が入力されます)。
4 回目の試行で、Object 型の値の配列に基づいてテーブルに行を追加するための別の構文に出くわしました。(ここを参照)。
あなたの場合、それは次のようになります:
cityTable.Rows.Add( new Object[] {
((City)e.DatabaseEntry).Id ,
ObjectThatGoesInColumn2 ,
ObjectThatGoesInColumn3 ,
ObjectThatGoesInLastColumn
}
それ以外の:
DataRow row = cityTable.NewRow();
row[0] = 100;
row["City Name"] = Anaheim;
row["Column 7"] = ...
...
row["Column 26"] = checksum;
workTable.Rows.Add( row );
これにより、各列を一度に 1 つずつ個別に設定する必要がないため、速度が向上します。また、プロファイラーの写真に基づいて、個別に設定していた列が少なくとも 12 列あります。
これにより、列名の文字列をハッシュして、処理している配列の位置を確認し、データ型が正しいことを再確認する必要がなくなります。
興味がある場合は、ここに私のテスト プロジェクトがあります。
class Program
{
public static System.Data.DataSet dataSet;
public static System.Data.DataSet dataSet2;
public static System.Data.DataSet dataSet3;
public static System.Data.DataSet dataSet4;
public static Random rand = new Random();
public static int NumOfRows = 300000;
static void Main(string[] args)
{
#region test1
Console.WriteLine("Starting");
Console.WriteLine("");
Stopwatch watch = new Stopwatch();
watch.Start();
MakeTable();
watch.Stop();
Console.WriteLine("Elapsed Time was: " + watch.ElapsedMilliseconds + " milliseconds.");
dataSet = null;
Console.WriteLine("");
Console.WriteLine("Completed.");
Console.WriteLine("");
#endregion
/*
#region test2
Console.WriteLine("Starting Test 2");
Console.WriteLine("");
watch.Reset();
watch.Start();
MakeTable2();
watch.Stop();
Console.WriteLine("Elapsed Time was: " + watch.ElapsedMilliseconds + " milliseconds.");
dataSet2 = null;
Console.WriteLine("");
Console.WriteLine("Completed Test 2.");
#endregion
#region test3
Console.WriteLine("");
Console.WriteLine("Starting Test 3");
Console.WriteLine("");
watch.Reset();
watch.Start();
MakeTable3();
watch.Stop();
Console.WriteLine("Elapsed Time was: " + watch.ElapsedMilliseconds + " milliseconds.");
dataSet3 = null;
Console.WriteLine("");
Console.WriteLine("Completed Test 3.");
#endregion
*/
#region test4
Console.WriteLine("Starting Test 4");
Console.WriteLine("");
watch.Reset();
watch.Start();
MakeTable4();
watch.Stop();
Console.WriteLine("Elapsed Time was: " + watch.ElapsedMilliseconds + " milliseconds.");
dataSet4 = null;
Console.WriteLine("");
Console.WriteLine("Completed Test 4.");
#endregion
//printTable();
Console.WriteLine("");
Console.WriteLine("Press Enter to Exit...");
Console.ReadLine();
}
private static void MakeTable()
{
DataTable table = new DataTable("Table 1");
DataColumn column;
DataRow row;
column = new DataColumn();
column.DataType = System.Type.GetType("System.Int32");
column.ColumnName = "id";
column.ReadOnly = true;
column.Unique = true;
table.Columns.Add(column);
for (int i = 65; i <= 90; i++)
{
column = new DataColumn();
column.DataType = System.Type.GetType("System.String");
column.ColumnName = "5-Letter Word " + (char)i;
column.AutoIncrement = false;
column.Caption = "Random Word " + (char)i;
column.ReadOnly = false;
column.Unique = false;
// Add the column to the table.
table.Columns.Add(column);
}
DataColumn[] PrimaryKeyColumns = new DataColumn[1];
PrimaryKeyColumns[0] = table.Columns["id"];
table.PrimaryKey = PrimaryKeyColumns;
// Instantiate the DataSet variable.
dataSet = new DataSet();
// Add the new DataTable to the DataSet.
dataSet.Tables.Add(table);
// Create three new DataRow objects and add
// them to the DataTable
for (int i = 0; i < NumOfRows; i++)
{
row = table.NewRow();
row["id"] = i;
for (int j = 65; j <= 90; j++)
{
row["5-Letter Word " + (char)j] = getRandomWord();
}
table.Rows.Add(row);
}
}
private static void MakeTable2()
{
DataTable table = new DataTable("Table 2");
DataColumn column;
DataRow row;
column = new DataColumn();
column.DataType = System.Type.GetType("System.Int32");
column.ColumnName = "id";
column.ReadOnly = true;
column.Unique = true;
table.Columns.Add(column);
for (int i = 65; i <= 90; i++)
{
column = new DataColumn();
column.DataType = System.Type.GetType("System.String");
column.ColumnName = "5-Letter Word " + (char)i;
column.AutoIncrement = false;
column.Caption = "Random Word " + (char)i;
column.ReadOnly = false;
column.Unique = false;
// Add the column to the table.
table.Columns.Add(column);
}
DataColumn[] PrimaryKeyColumns = new DataColumn[1];
PrimaryKeyColumns[0] = table.Columns["id"];
table.PrimaryKey = PrimaryKeyColumns;
// Instantiate the DataSet variable.
dataSet2 = new DataSet();
// Add the new DataTable to the DataSet.
dataSet2.Tables.Add(table);
// Create three new DataRow objects and add
// them to the DataTable
for (int i = 0; i < NumOfRows; i++)
{
row = table.NewRow();
row.BeginEdit();
row["id"] = i;
for (int j = 65; j <= 90; j++)
{
row["5-Letter Word " + (char)j] = getRandomWord();
}
row.EndEdit();
table.Rows.Add(row);
}
}
private static void MakeTable3()
{
DataTable table = new DataTable("Table 3");
DataColumn column;
column = new DataColumn();
column.DataType = System.Type.GetType("System.Int32");
column.ColumnName = "id";
column.ReadOnly = true;
column.Unique = true;
table.Columns.Add(column);
for (int i = 65; i <= 90; i++)
{
column = new DataColumn();
column.DataType = System.Type.GetType("System.String");
column.ColumnName = "5-Letter Word " + (char)i;
column.AutoIncrement = false;
column.Caption = "Random Word " + (char)i;
column.ReadOnly = false;
column.Unique = false;
// Add the column to the table.
table.Columns.Add(column);
}
DataColumn[] PrimaryKeyColumns = new DataColumn[1];
PrimaryKeyColumns[0] = table.Columns["id"];
table.PrimaryKey = PrimaryKeyColumns;
// Instantiate the DataSet variable.
dataSet3 = new DataSet();
// Add the new DataTable to the DataSet.
dataSet3.Tables.Add(table);
DataRow[] newRows = new DataRow[NumOfRows];
for (int i = 0; i < NumOfRows; i++)
{
newRows[i] = table.NewRow();
}
// Create three new DataRow objects and add
// them to the DataTable
for (int i = 0; i < NumOfRows; i++)
{
newRows[i]["id"] = i;
for (int j = 65; j <= 90; j++)
{
newRows[i]["5-Letter Word " + (char)j] = getRandomWord();
}
table.Rows.Add(newRows[i]);
}
}
private static void MakeTable4()
{
DataTable table = new DataTable("Table 2");
DataColumn column;
column = new DataColumn();
column.DataType = System.Type.GetType("System.Int32");
column.ColumnName = "id";
column.ReadOnly = true;
column.Unique = true;
table.Columns.Add(column);
for (int i = 65; i <= 90; i++)
{
column = new DataColumn();
column.DataType = System.Type.GetType("System.String");
column.ColumnName = "5-Letter Word " + (char)i;
column.AutoIncrement = false;
column.Caption = "Random Word " + (char)i;
column.ReadOnly = false;
column.Unique = false;
// Add the column to the table.
table.Columns.Add(column);
}
DataColumn[] PrimaryKeyColumns = new DataColumn[1];
PrimaryKeyColumns[0] = table.Columns["id"];
table.PrimaryKey = PrimaryKeyColumns;
// Instantiate the DataSet variable.
dataSet4 = new DataSet();
// Add the new DataTable to the DataSet.
dataSet4.Tables.Add(table);
// Create three new DataRow objects and add
// them to the DataTable
for (int i = 0; i < NumOfRows; i++)
{
table.Rows.Add(
new Object[] {
i,
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord()
}
);
}
}
private static string getRandomWord()
{
char c0 = (char)rand.Next(65, 90);
char c1 = (char)rand.Next(65, 90);
char c2 = (char)rand.Next(65, 90);
char c3 = (char)rand.Next(65, 90);
char c4 = (char)rand.Next(65, 90);
return "" + c0 + c1 + c2 + c3 + c4;
}
private static void printTable()
{
foreach (DataRow row in dataSet.Tables[0].Rows)
{
Console.WriteLine( row["id"] + "--" + row["5-Letter Word A"] + " - " + row["5-Letter Word Z"] );
}
}
}
私はまだあなたの平行性を実際に見ていませんが、いくつかのことがあります。
まず、「ParsedFiles++;」を変更します。"Interlocked.Increment( ref ParsedFiles);" にするか、その周りをロックします。
次に、複雑なイベント駆動型の並列処理の代わりに、これに非常に適したパイプライン パターンを使用することをお勧めします。
並行コレクションからの並行キュー (またはブロック コレクション) を使用して、ステージを保持します。
最初の段階では、処理するファイルのリストが保持されます。
ワーカー タスクは、そのワーク リストからファイルをデキューし、解析してから、第 2 ステージに追加します。
第 2 段階では、ワーカー タスクが第 2 段階のキュー (データテーブルの完了したばかりのブロック) から項目を取得し、アップロードの準備ができた時点でそれらをデータベースにアップロードします。
編集:
途中で役立つコードのパイプラインバージョンを作成しました。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.IO;
using System.Data;
namespace dataTableTesting2
{
class Program
{
private static const int BufferSize = 20; //Each buffer can only contain this many elements at a time
//This limits the total amount of memory
private static const int MaxBlockSize = 100;
private static BlockingCollection<string> buffer1 = new BlockingCollection<string>(BufferSize);
private static BlockingCollection<string[]> buffer2 = new BlockingCollection<string[]>(BufferSize);
private static BlockingCollection<Object[][]> buffer3 = new BlockingCollection<Object[][]>(BufferSize);
/// <summary>
/// Start Pipelines and wait for them to finish.
/// </summary>
static void Main(string[] args)
{
TaskFactory f = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
Task stage0 = f.StartNew(() => PopulateFilesList(buffer1));
Task stage1 = f.StartNew(() => ReadFiles(buffer1, buffer2));
Task stage2 = f.StartNew(() => ParseStringBlocks(buffer2, buffer3));
Task stage3 = f.StartNew(() => UploadBlocks(buffer3) );
Task.WaitAll(stage0, stage1, stage2, stage3);
/*
// Note for more workers on particular stages you can make more tasks for each stage, like the following
// which populates the file list in 1 task, reads the files into string[] blocks in 1 task,
// then parses the string[] blocks in 4 concurrent tasks
// and lastly uploads the info in 2 tasks
TaskFactory f = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
Task stage0 = f.StartNew(() => PopulateFilesList(buffer1));
Task stage1 = f.StartNew(() => ReadFiles(buffer1, buffer2));
Task stage2a = f.StartNew(() => ParseStringBlocks(buffer2, buffer3));
Task stage2b = f.StartNew(() => ParseStringBlocks(buffer2, buffer3));
Task stage2c = f.StartNew(() => ParseStringBlocks(buffer2, buffer3));
Task stage2d = f.StartNew(() => ParseStringBlocks(buffer2, buffer3));
Task stage3a = f.StartNew(() => UploadBlocks(buffer3) );
Task stage3b = f.StartNew(() => UploadBlocks(buffer3) );
Task.WaitAll(stage0, stage1, stage2a, stage2b, stage2c, stage2d, stage3a, stage3b);
*/
}
/// <summary>
/// Adds the filenames to process into the first pipeline
/// </summary>
/// <param name="output"></param>
private static void PopulateFilesList( BlockingCollection<string> output )
{
try
{
buffer1.Add("file1.txt");
buffer1.Add("file2.txt");
//...
buffer1.Add("lastFile.txt");
}
finally
{
output.CompleteAdding();
}
}
/// <summary>
/// Takes filnames out of the first pipeline, reads them into string[] blocks, and puts them in the second pipeline
/// </summary>
private static void ReadFiles( BlockingCollection<string> input, BlockingCollection<string[]> output)
{
try
{
foreach (string file in input.GetConsumingEnumerable())
{
List<string> list = new List<string>(MaxBlockSize);
using (StreamReader sr = new StreamReader(file))
{
int countLines = 0;
while (!sr.EndOfStream)
{
list.Add( sr.ReadLine() );
countLines++;
if (countLines > MaxBlockSize)
{
output.Add(list.ToArray());
countLines = 0;
list = new List<string>(MaxBlockSize);
}
}
if (list.Count > 0)
{
output.Add(list.ToArray());
}
}
}
}
finally
{
output.CompleteAdding();
}
}
/// <summary>
/// Takes string[] blocks from the second pipeline, for each line, splits them by tabs, and parses
/// the data, storing each line as an object array into the third pipline.
/// </summary>
private static void ParseStringBlocks( BlockingCollection<string[]> input, BlockingCollection< Object[][] > output)
{
try
{
List<Object[]> result = new List<object[]>(MaxBlockSize);
foreach (string[] block in input.GetConsumingEnumerable())
{
foreach (string line in block)
{
string[] splitLine = line.Split('\t'); //split line on tab
string cityName = splitLine[0];
int cityPop = Int32.Parse( splitLine[1] );
int cityElevation = Int32.Parse(splitLine[2]);
//...
result.Add(new Object[] { cityName, cityPop, cityElevation });
}
output.Add( result.ToArray() );
}
}
finally
{
output.CompleteAdding();
}
}
/// <summary>
/// Takes the data blocks from the third pipeline, and uploads each row to SQL Database
/// </summary>
private static void UploadBlocks(BlockingCollection<Object[][]> input)
{
/*
* At this point 'block' is an array of object arrays.
*
* The block contains MaxBlockSize number of cities.
*
* There is one object array for each city.
*
* The object array for the city is in the pre-defined order from pipeline stage2
*
* You could do a couple of things at this point:
*
* 1. declare and initialize a DataTable with the correct column types
* then, do the dataTable.Rows.Add( rowValues )
* then, use a Bulk Copy Operation to upload the dataTable to SQL
* http://msdn.microsoft.com/en-us/library/7ek5da1a
*
* 2. Manually perform the sql commands/transactions similar to what
* Kevin recommends in this suggestion:
* http://stackoverflow.com/questions/1024123/sql-insert-one-row-or-multiple-rows-data/1024195#1024195
*
* I've demonstrated the first approach with this code.
*
* */
DataTable dataTable = new DataTable();
//set up columns of dataTable here.
foreach (Object[][] block in input.GetConsumingEnumerable())
{
foreach (Object[] rowValues in block)
{
dataTable.Rows.Add(rowValues);
}
//do bulkCopy to upload table containing MaxBlockSize number of cities right here.
dataTable.Rows.Clear(); //Remove the rows when you are done uploading, but not the dataTable.
}
}
}
}
作業は、さまざまなタスクで実行できる 4 つの部分に分割されます。
処理するファイルのリストを作成する
そのリストからファイルを取得し、string[] に読み込みます
前の部分から文字列[]を取得し、それらを解析して、テーブルの各行の値を含むオブジェクト[]を作成します
行をデータベースにアップロードします
また、必要に応じて複数のワーカーが同じパイプライン ステージを実行できるように、各フェーズに複数のタスクを簡単に割り当てることもできます。
(ソリッド ステート ドライブを使用している場合を除き、ファイルから複数のタスクを読み取ることが役立つとは思えません。メモリ内でのジャンプは非常に遅いためです)。
また、プログラムの実行によってメモリ内のデータ量に制限を設定できます。
各バッファーは、最大サイズで初期化された BlockingCollection です。つまり、バッファーがいっぱいで、別のタスクが別の要素を追加しようとすると、そのタスクがブロックされます。
さいわい、Task Parallel Library はスマートで、タスクがブロックされると、ブロックされていない別のタスクをスケジュールし、後で最初のタスクがブロックされなくなったかどうかを確認します。
現在、各バッファーは 20 個のアイテムしか保持できず、各アイテムは 100 個しかありません。つまり、次のことを意味します。
buffer1 には、いつでも最大 20 個のファイル名が含まれます。
buffer2 には、いつでも、これらのファイルから最大 20 ブロックの文字列 (100 行で構成される) が含まれます。
buffer3 には、いつでも最大 20 項目のデータ ブロック (100 都市のオブジェクト値) が含まれます。
したがって、これには、20 個のファイル名、ファイルからの 2000 行、および 2000 の都市情報を保持するのに十分なメモリが必要です。(ローカル変数などには少し余分に)。
効率のために BufferSize と MaxBlockSize を増やしたいと思うでしょうが、そのままでもうまくいくはずです。
入力ファイルがなかったため、これをテストしていないことに注意してください。そのため、いくつかのバグがある可能性があります。