1

SQL Server テーブルにデータを挿入するための最終目的地タスクとして使用する SSIS 変換タスクがあります。SQL Server Destination タスクではなく変換タスクを使用する理由は、挿入先のテーブルの列がどうなるかを事前に知らないためです。

for each ループ コンテナーで、(97 形式の) アクセス DB を探しています。制御フローの残りの部分では、基本的に、新しい SQL データベースとテーブルも作成します。アクセスファイルは、私たちが「分」データベースと呼んでいるもので、別のプロセスによって収集された分単位の情報が含まれています。「分」データベースにちなんで名付けられた新しい SQL DB と、アクセス データベースからの特定の情報に基づいて作成された列を含む「MINUTE」というテーブルを作成する必要があります。クライアントごとに、サイトにあるパラメーターの数に基づいて、SQL Minute テーブルに作成する必要がある列の数を決定します。

データ フローには、OLE DB ソース コンポーネント (ソース - 分テーブル) とスクリプト変換タスク (宛先 - 分テーブル) の 2 つの主要なコンポーネントがあります。

「ソース - 分テーブル」は、アクセス データベースからデータを取得します。「Destination - Minute Table」は、データを変換し、適切な DB とテーブルに挿入します。

すべてが正常に機能します。491,000 以上のレコードを持つ DB でテストしたところ、1 分かかりました。ただし、50 を超えるパラメーターを持ち、アクセス データベースに 200 万以上のレコードが含まれる大規模な顧客の 1 つでテストしています。パッケージは約 477,000 レコードに到達するまで飛行し、その後ほとんど停止します。レコード数が更新されるまで 10 分、またはそれ以上待つことができます。その後、再び待ち続けます。

私は多くの調査を行い、見つけたすべての推奨事項とガイドラインに従いました。データソースがソートされていません。OLE DB ソースでテーブルなどの代わりに SQL コマンドを使用します。DefaultBufferMaxRows と DefaultBufferSize の値を何度も変更しましたが、同じ結果が得られました。

コード:

Public Class ScriptMain
Inherits UserComponent

Private conn As SqlConnection
Private cmd As SqlCommand
Private DBName As SqlParameter
Private columnsForInsert As SqlParameter
Private tableValues As SqlParameter
Private numberOfParams As Integer
Private db As String
Private folderPath As String
Private dbConn As String
Private folder As String
Private columnParamIndex As Integer
Private columnDate As DateTime
Private columnMinValue As Double
Private columnStatus As String
Private columnCnt1 As Int16
Private dateAdded As Boolean = False
Private columnStatusCnt As String
Private columnsConstructed As Boolean = False
Private buildValues As StringBuilder
Private columnValues As StringBuilder
Private i As Integer = 0

'This method is called once, before rows begin to be processed in the data flow.
'
'You can remove this method if you don't need to do anything here.
Public Overrides Sub PreExecute()
    MyBase.PreExecute()

    Try
        'Dim dbConnection As String = "Server=(local)\SQLExpress;Database=DataConversion;User ID=sa;Password=sa123;"
        'conn = New SqlConnection(dbConnection)
        'conn.Open()
        'cmd = New SqlCommand("dbo.InsertValues", conn) With {.CommandType = CommandType.StoredProcedure}

        'columnsForInsert = New SqlParameter("@Columns", SqlDbType.VarChar, -1) With {.Direction = ParameterDirection.Input}
        'cmd.Parameters.Add(columnsForInsert)

        'DBName = New SqlParameter("@DBName", SqlDbType.VarChar, -1) With {.Direction = ParameterDirection.Input}
        'cmd.Parameters.Add(DBName)

        'tableValues = New SqlParameter("@Values", SqlDbType.VarChar, -1) With {.Direction = ParameterDirection.Input}
        'cmd.Parameters.Add(tableValues)

        db = Variables.varMinFileName.ToString
        folder = Variables.varMinFolderName.ToString
        folderPath = folder & "\" & db & ".mdb"
        dbConn = "Provider=Microsoft.Jet.OLEDB.4.0;Data Source=" & folderPath

        Using SourceDataAdapter As OleDbDataAdapter = New OleDbDataAdapter("SELECT DISTINCT PARAM_INDEX FROM [MINUTE];", dbConn)
            Dim SourceDatatable As New DataTable

            SourceDataAdapter.Fill(SourceDatatable)

            numberOfParams = SourceDatatable.Rows.Count
        End Using

        'columnValues.Append("dtmTime, ")
        buildValues = New StringBuilder
        columnValues = New StringBuilder

        columnValues.Append("dtmTime, ")

    Catch ex As Exception
        Dim writer As New StreamWriter("C:\MinuteLog.log", True, System.Text.Encoding.ASCII)

        writer.WriteLine(ex.Message)
        writer.Close()
        writer.Dispose()
    Finally

    End Try
End Sub

' This method is called after all the rows have passed through this component.
'
' You can delete this method if you don't need to do anything here.
Public Overrides Sub PostExecute()
    MyBase.PostExecute()
    '
    ' Add your code here
    '
    buildValues = Nothing
    columnValues = Nothing
End Sub

Public Overrides Sub Input0_ProcessInput(Buffer As Input0Buffer)
    While Buffer.NextRow()
        Input0_ProcessInputRow(Buffer)
    End While
End Sub

'This method is called once for every row that passes through the component from Input0.
Public Overrides Sub Input0_ProcessInputRow(ByVal Row As Input0Buffer)
    Dim column As IDTSInputColumn100
    Dim rowType As Type = Row.GetType()
    Dim columnValue As PropertyInfo
    Dim result As Object
    Dim rtnValue As String = Variables.varMinFileName.Replace("_", "")
    Dim colName As String

    Try
        For Each column In Me.ComponentMetaData.InputCollection(0).InputColumnCollection
            columnValue = rowType.GetProperty(column.Name)

            colName = column.Name.ToString

            If Not colName.Contains("NULL") Then
                'If Not columnValue Is Nothing Then
                Select Case column.Name.ToString
                    Case "PARAM_INDEX"
                        'result = columnValue.GetValue(Row, Nothing)
                        result = Row.PARAMINDEX
                        columnParamIndex = CType(result, Byte)
                        If columnsConstructed = False And i <= numberOfParams - 1 Then
                            columnValues.Append(String.Format("VALUE_{0}, STATUS_{0}, ", columnParamIndex.ToString))
                        End If
                        Exit Select
                    Case "dtmTIME"
                        'result = columnValue.GetValue(Row, Nothing)
                        result = Row.dtmTIME
                        columnDate = CType(result, DateTime)
                        If dateAdded = False Then ' only need to add once since rows are vertical
                            buildValues.Append("'" & columnDate & "', ")
                            dateAdded = True
                        End If
                        Exit Select
                    Case "MIN_VALUE"
                        'result = columnValue.GetValue(Row, Nothing)
                        result = Row.MINVALUE
                        columnMinValue = CType(result, Double)
                        buildValues.Append(columnMinValue & ", ")
                        Exit Select
                    Case "MIN_STATUS"
                        'result = columnValue.GetValue(Row, Nothing)
                        result = Row.MINSTATUS
                        columnStatus = CType(result, String)
                        Exit Select
                    Case "MIN_CNT_1"
                        'result = columnValue.GetValue(Row, Nothing)
                        result = Row.MINCNT1
                        columnCnt1 = CType(result, Byte)
                        columnStatusCnt = columnStatus & "010" & columnCnt1.ToString.PadLeft(5, "0"c) & "-----"
                        buildValues.Append("'" & columnStatusCnt & "', ")
                    Case Else
                        Exit Select
                End Select
                'End If
            End If
        Next

        If i = numberOfParams - 1 Then
            If columnsConstructed = False Then
                columnValues.Remove(columnValues.Length - 2, 1)
            End If

            buildValues.Remove(buildValues.Length - 2, 1)

            Dim valueResult As String = buildValues.ToString()

            SetStoredProc()

            cmd.Parameters("@Columns").Value = columnValues.ToString
            cmd.Parameters("@DBName").Value = "[" & rtnValue & "].[dbo].[MINUTE]"
            cmd.Parameters("@Values").Value = valueResult
            cmd.ExecuteNonQuery()

            buildValues.Clear()

            columnsConstructed = True
            dateAdded = False
            columnParamIndex = 0
            columnMinValue = 0
            columnStatus = String.Empty
            columnCnt1 = 0

            i = 0
            conn.Close()
            conn.Dispose()
        Else
            i += 1
        End If
    Catch ex As Exception
        Dim writer As New StreamWriter("C:\MinuteLog.log", True, System.Text.Encoding.ASCII)

        writer.WriteLine(ex.Message)
        writer.Close()
        writer.Dispose()
    Finally
        'buildValues = Nothing
        'columnValues = Nothing
    End Try
End Sub

Private Sub SetStoredProc()
    Try
        Dim dbConnection As String = "Server=(local)\SQLExpress;Database=DataConversion;User ID=sa;Password=sa123;"
        conn = New SqlConnection(dbConnection)
        conn.Open()
        cmd = New SqlCommand("dbo.InsertValues", conn) With {.CommandType = CommandType.StoredProcedure}

        columnsForInsert = New SqlParameter("@Columns", SqlDbType.VarChar, -1) With {.Direction = ParameterDirection.Input}
        cmd.Parameters.Add(columnsForInsert)

        DBName = New SqlParameter("@DBName", SqlDbType.VarChar, -1) With {.Direction = ParameterDirection.Input}
        cmd.Parameters.Add(DBName)

        tableValues = New SqlParameter("@Values", SqlDbType.VarChar, -1) With {.Direction = ParameterDirection.Input}
        cmd.Parameters.Add(tableValues)
    Catch ex As Exception
        Dim writer As New StreamWriter("C:\MinuteLog.log", True, System.Text.Encoding.ASCII)

        writer.WriteLine(ex.Message)
        writer.Close()
        writer.Dispose()
    End Try
End Sub
End Class

まだ画像をアップロードできないので、ここに記載されている問題を理解するのに役立つ十分なスクリーン ショットを含む、作成したブログ リンクを含めました: SSIS は、変換タスク中に遅くなります

私のパッケージが 400k レコード後に遅くなり、妥当な時間内に 200 万以上のすべてのレコードを処理しない理由を特定するための助けをいただければ幸いです!

ありがとう、ジミー

4

2 に答える 2

1

完全なソリューションは、スクリーンショット付きの私のブログでここで見ることができます - SSIS のスローダウンが解決されました

大量のレコードが変換され、宛先として SQL Server に挿入されるときに SSIS の速度が低下するのを回避するために、SSIS パッケージを再設計しました。バッファーを通過するすべてのレコードに対してデータ変換タスクで挿入を行う代わりに、それを排除し、ストアド プロシージャを使用して一括挿入を行いました。これを実現するために、各アクセス DB から SQL Server インスタンスの「MINUTE」というテーブルにデータを読み込みます。この小さなテーブルにはアクセス DB と同じスキーマがあり、すべてのデータをこのテーブルにインポートするという面倒な作業は SSIS に任せました。データがインポートされた後、ストアド プロシージャを実行して、この分のテーブル (水平レコード) のデータを変換し、新しい宛先の MINUTE SQL テーブル (1 つの垂直レコード) に一括挿入します。

一括挿入を実行してデータを変換するストアド プロシージャは次のようになります。

PROCEDURE [dbo].[InsertMinuteBulk]
 -- Add the parameters for the stored procedure here
 (@Columns varchar(MAX), @DBName varchar(4000))
 AS
 BEGIN
 DECLARE @SQL varchar(MAX)

SET @SQL =’;WITH Base AS (
 SELECT dtmTime,
 param_index,
 CONVERT(nvarchar(16), MIN_VALUE) AS [VALUE_],
 CONVERT(nvarchar(3), MIN_STATUS) + ”000” + LEFT(replicate(”0”,5) + CONVERT(nvarchar(5), MIN_CNT_1),5) + ”—–” AS [STATUS_]
 FROM [DataConversion].[dbo].[MINUTE]
 )
 ,norm AS (
 SELECT dtmTime, ColName + CONVERT(varchar, param_index) AS ColName, ColValue
 FROM Base
 UNPIVOT (ColValue FOR ColName IN ([VALUE_], [STATUS_])) AS pvt
 )
 INSERT INTO ‘ + @DBName + ‘
SELECT *
 FROM norm
 PIVOT (MIN(ColValue) FOR ColName IN (‘+@Columns+’)) AS pvt’

EXEC (@SQL);

データ フロー タスクでは、「Minute Data Source」は ADO.NET データ ソースであり、SQL Server の宛先である「Minute Data Destination」にデータをフィードします。

制御フローでは、「Bulk Insert Minute Data」の最終タスクで Bulk Insert ストアド プロシージャを実行します。

パッケージは途切れることなく実行され、読み取り、変換、挿入するデータのサイズを考えるとかなり高速です。

パッケージを SSIS ジョブとして実行したところ、各アクセス DB に 200 万行を超える 7 か月 (またはアクセス DB の 7 分) 分のデータの変換を完了するのに 38 分かかりました。

于 2013-08-16T20:12:03.970 に答える