最近、マスター マッピング ファイルを読み取って、フラット ファイル名に基づいてデータをテーブルに送り込む自動 ETL プロセスの作成を任されました。私は SqlBulkCopy を使用することにしましたが、すべて問題ないようです。IDataReader インターフェイスは、フラット ファイルを読み取るために実装されました。SQL Server のメタデータには、1 対 1 のデータ マッピング用に多数の列が用意されており、空の文字列を含むファイルに出くわすまで、すべてが機能していました。SqlBulkCopy は、「データ ソースからの String 型の指定された値を、指定されたターゲット列の int 型に変換できません。」という例外をスローします。話の終わり、この列の DB タイプが INT NULL であることさえ気にしません。メタデータをさらに解釈し、特定の列のデータ型を抽出し、抽出した情報に基づいて DataSet を構築できることを知っています。フラットファイルからデータを再キャストし、うまく機能する強く型付けされたソリューションを自分で手に入れますが、私は怠惰な男で、彼の幸せはマイクロソフトによってひどく傷つけられたように感じています。突然の問題。お時間をいただきありがとうございます。
List<String> fileNames;
DateTime startJobTime = DateTime.Now;
Console.WriteLine("---------------------------------------------");
Console.WriteLine("Start Time: " + startJobTime);
Console.WriteLine("---------------------------------------------");
using (SqlConnection sqlCon = new SqlConnection(sqlConnection))
{
try
{
sqlCon.Open();
sqlCon.ChangeDatabase(edwDBName);
// Get service information for staging job
UnivStage us = GetStagingJobInfo(jobName, sqlCon);
us.StartJobTime = startJobTime;
// Get a list of file names
fileNames = GetFileList(us, args);
if (fileNames.Count > 0)
{
// Truncate Staging Table
TruncateStagingTable(us, sqlCon);
// Close and dispose of sqlCon2 connection
sqlCon.Close();
Console.WriteLine("Processing files: ");
foreach (String fileName in fileNames)
Console.WriteLine(fileName);
Console.WriteLine();
}
else
{
Console.WriteLine("No files to process.");
Environment.Exit(0);
}
// Re-open Sql Connection
sqlCon.Open();
sqlCon.ChangeDatabase(stagingDBName);
foreach (String filePath in fileNames)
{
using (SqlTransaction sqlTran = sqlCon.BeginTransaction())
{
using (FlatFileReader ffReader = new FlatFileReader(filePath, us.Delimiter))
{
using (SqlBulkCopy sqlBulkCopy =
new SqlBulkCopy(sqlCon, SqlBulkCopyOptions.Default, sqlTran))
{
SqlConnection sqlCon2 = new SqlConnection(sqlConnection);
SetColumnList(sqlCon2, us, sqlBulkCopy);
sqlBulkCopy.BatchSize = 1000;
sqlBulkCopy.DestinationTableName =
us.StagingSchemaName + "." + us.StagingTableName;
sqlBulkCopy.WriteToServer(ffReader);
sqlTran.Commit();
sqlCon2.Close();
}
}
}
}
sqlCon.ChangeDatabase(edwDBName);
sqlCon.Close();
sqlCon.Open();
SetRowCount(us, sqlCon);
sqlCon.Close();
us.EndJobTime = DateTime.Now;
sqlCon.Open();
LogStagingProcess(us, sqlCon);
sqlCon.Close();
Console.WriteLine(us.ProcessedRowCount + " rows inserted.");
Console.WriteLine("---------------------------------------------");
Console.WriteLine("Success! End Time: " + us.EndJobTime);
Console.WriteLine("---------------------------------------------");
Console.ReadLine();
}
catch (SqlException e)
{
RenderExceptionMessagesAndExit(e,
"Exception have occured during an attempt to utilize SqlBulkCopy\n");
}
}