7

分散トランザクションでの複数のデータベース接続の動作を判断しようとしています。

一連のスレッドを生成する長時間実行プロセスがあり、各スレッドはそのDB接続などの管理を担当します。これらはすべてトランザクションスコープ内で実行され、各スレッドはオブジェクトを介してトランザクションに参加しDependentTransactionます。

このプロセスを並行して実行しようとすると、いくつかの問題が発生しました。つまり、トランザクションでクエリを同時に実行できないようなブロックがあるようです。

私が知りたいのは、トランザクションコーディネーターが同じDBへの複数の接続からのクエリをどのように処理するか、そしてスレッド間で接続オブジェクトを渡すことが推奨されるかどうかです。

MS SQLではトランザクションごとに1つの接続しか許可されていないことを読みましたが、同じトランザクションで同じDBへの複数の接続を作成して初期化できることは明らかです。接続を開くときに「別のセッションで使用されているトランザクションコンテキスト」例外が発生しない限り、スレッドを並列に実行することはできません。その結果、接続は同時に実行されるのではなく実行を待機する必要があり、最終的にコードは完了まで実行されますが、このロックの問題のためにアプリをスレッド化することによる純利益はありません。

コードは次のようになります。

    Sub StartThreads()
        Using Scope As New TransactionScope
            Dim TL(100) As Tasks.Task
            Dim dTx As DependentTransaction
            For i As Int32 = 0 To 100
                Dim A(1) As Object
                dTx = CType(Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete), DependentTransaction)
                'A(0) = some_other_data
                A(1) = dTx 'the Dependent Transaction

                TL(i) = Tasks.Task.Factory.StartNew(AddressOf Me.ProcessData, A) 'Start the thread and add it to the array
            Next

            Tasks.Task.WaitAll(TL) 'Wait for threads to finish

            Scope.Complete()
        End Using
    End Sub
    Dim TransLock As New Object
    Sub ProcessData(ByVal A As Object)
        Dim DTX As DependentTransaction = A(1)
        Dim Trans As Transactions.TransactionScope
        Dim I As Int32
        Do While True
            Try
                SyncLock (TransLock)
                    Trans = New Transactions.TransactionScope(DTX, TimeSpan.FromMinutes(1))
                End SyncLock
                Exit Do
            Catch ex As TransactionAbortedException
                If ex.ToString.Contains("Failure while attempting to promote transaction") Then
                ElseIf ex.Message = "The transaction has aborted." Then
                    Throw New Exception(ex.ToString)
                    Exit Sub
                End If
                I += 1
                If I > 5 Then
                    Throw New Exception(ex.ToString)
                End If
            Catch ex As Exception

            End Try
            Thread.Sleep(10)
        Loop
        Using Trans
            Using DALS As New DAC.DALScope
                Do While True
                    Try
                        SyncLock (TransLock)
                            'This opens two connection to the same DB for later use.
                            DALS.CurrentDAL.OpenConnection(DAC.DAL.ConnectionList.FirstConnection)
                            DALS.CurrentDAL.OpenConnection(DAC.DAL.ConnectionList.SecondConnection)
                        End SyncLock
                        Exit Do
                    Catch ex As Exception
                        'This is usually where I find the bottleneck
                        '"Transaction context in use by another session" is the exception that I get
                        Thread.Sleep(100)
                    End Try
                Loop

                '*****************
                'Do some work here
                '*****************

                Trans.Complete()
            End Using
        End Using
        DTX.Complete()
    End Sub

編集

私のテストは、これができないことを決定的に示しました。複数の接続がある場合、または同じ接続が使用されている場合でも、トランザクション内のすべての要求または質問は順番に処理されます。

おそらく、彼らは将来この振る舞いを変えるでしょう。

4

1 に答える 1

9

まず、SQL Serverトランザクションについてあちこちで読んだ内容を、ローカルと分散の2つの異なるケースに分ける必要があります。

ローカルSQLトランザクション

  • SQL Serverでは、ローカルトランザクションごとに1つの要求のみを実行できます。
  • デフォルトでは、1つのセッションのみがローカルトランザクションに登録できます。sp_getbindtokenおよびsp_bindsessionを使用して、複数のセッションをローカルトランザクションに登録できます。セッションは、いつでもリクエストを実行する1つだけに制限されます。
  • 複数のアクティブな結果セット(MARS)を使用すると、1つのセッションで複数の要求を実行できます。すべてのリクエストは、同じローカルトランザクションに登録する必要があります。

分散トランザクション

  • 複数のセッションで、ローカルトランザクションを単一の分散トランザクションに登録できます。
  • 各セッションは引き続きローカルトランザクションに登録されますが、ローカルトランザクションに関する上記のすべての制限が適用されます。
  • 分散トランザクションに登録されたローカルトランザクションは、分散トランザクションによって調整された2フェーズコミットの対象となります
  • 分散トランザクションに登録されているインスタンス上のすべてのローカルトランザクションは、依然として独立したローカルトランザクションであり、主に、競合するロック名前空間があることを意味します。

したがって、クライアントが.Net TransactionScopeを作成し、このトランザクションスコープの下で同じサーバー上で複数の要求を実行すると、これらの要求はすべて分散トランザクションに登録されたローカルトランザクションになります。簡単な例:

class Program
    {
        static string sqlBatch = @"
set nocount on;
declare @i int;
set @i = 0;
while @i < 100000
begin
    insert into test (a) values (replicate('a',100));
    set @i = @i+1;
end";

        static void Main(string[] args)
        {
            try
            {
                TransactionOptions to = new TransactionOptions();
                to.IsolationLevel = IsolationLevel.ReadCommitted;
                using (TransactionScope scp = new TransactionScope(TransactionScopeOption.Required, to))
                {
                    using (SqlConnection connA = new SqlConnection(Settings.Default.connString))
                    {
                        connA.Open();
                        using (SqlConnection connB = new SqlConnection(Settings.Default.connString))
                        {
                            connB.Open();

                            SqlCommand cmdA = new SqlCommand(sqlBatch, connA);
                            SqlCommand cmdB = new SqlCommand(sqlBatch, connB);

                            IAsyncResult arA = cmdA.BeginExecuteNonQuery();
                            IAsyncResult arB = cmdB.BeginExecuteNonQuery();

                            WaitHandle.WaitAll(new WaitHandle[] { arA.AsyncWaitHandle, arB.AsyncWaitHandle });

                            cmdA.EndExecuteNonQuery(arA);
                            cmdB.EndExecuteNonQuery(arB);
                        }
                    }
                    scp.Complete();
                }
            }
            catch (Exception e)
            {
                Console.Error.Write(e);
            }
        }
    }

ダミーのテストテーブルを作成します。

create table test (id int not null identity(1,1) primary key, a varchar(100));

サンプルのコードを実行します。両方のリクエストが並行して実行され、それぞれがテーブル内の100k行を取得し、トランザクションスコープが完了すると両方がコミットすることがわかります。したがって、発生している問題はSQL ServerにもTransactionScopeにも関連しておらず、説明したシナリオを簡単に処理できます。さらに、コードは非常に単純で単純であり、依存トランザクションを作成したり、クローンを作成したり、トランザクションを促進したりする必要はありません。

更新しました

明示的なスレッドと依存トランザクションの使用:

 private class ThreadState
    {
        public DependentTransaction Transaction {get; set;}
        public EventWaitHandle Done {get; set;}
        public SqlConnection Connection { get; set; }
    }
    static void Main(string[] args)
    {
        try
        {
            TransactionOptions to = new TransactionOptions();
            to.IsolationLevel = IsolationLevel.ReadCommitted;
            using (TransactionScope scp = new TransactionScope(TransactionScopeOption.Required, to))
            {
                ThreadState stateA = new ThreadState 
                {
                    Transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete),
                    Done = new AutoResetEvent(false),
                    Connection = new SqlConnection(Settings.Default.connString),
                };
                stateA.Connection.Open();
                ThreadState stateB = new ThreadState
                {
                    Transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete),
                    Done = new AutoResetEvent(false),
                    Connection = new SqlConnection(Settings.Default.connString),
                };
                stateB.Connection.Open();

                ThreadPool.QueueUserWorkItem(new WaitCallback(Worker), stateA);
                ThreadPool.QueueUserWorkItem(new WaitCallback(Worker), stateB);

                WaitHandle.WaitAll(new WaitHandle[] { stateA.Done, stateB.Done });

                scp.Complete();

                //TODO: dispose the open connections
            }

        }
        catch (Exception e)
        {
            Console.Error.Write(e);
        }
    }

    private static void Worker(object args)
    {
        Debug.Assert(args is ThreadState);
        ThreadState state = (ThreadState) args;
        try
        {
            using (TransactionScope scp = new TransactionScope(state.Transaction))
            {
                SqlCommand cmd = new SqlCommand(sqlBatch, state.Connection);
                cmd.ExecuteNonQuery();
                scp.Complete();
            }
            state.Transaction.Complete();
        }
        catch (Exception e)
        {
            Console.Error.WriteLine(e);
            state.Transaction.Rollback();
        }
        finally
        {
            state.Done.Set();
        }

    }
于 2010-02-02T19:36:30.787 に答える