1

私は、ジョブをスケジュールして実行するためのメッセージングシステムとしてサービスブローカーを使用しています。Eashジョブは、エンジンと呼ばれる複数のタスクまたはステップで構成されます。

私のサービスブローカーオブジェクトは次のとおりです。

  • メッセージタイプ:SubmitJob、JobResponse、SubmitTask、TaskResponse
  • 契約:JobContract、TaskContract
  • キュー:ClientQueue、JobQueue、EngineQueue、ExternalActivatorQueue
  • サービス:ClientService、JobService、EngineService、ExternalActivatorService
  • イベント通知:EventNotificationEngineQueue

ジョブキューに内部アクティベーション(ストアドプロシージャ)があります。SubmitJob MessageTypesの場合、ストアドプロシージャはそのジョブの最初のタスクを取得し、EngineServiceとのダイアログを開始し、そのキューにメッセージを送信します(StartTask)TaskResponses MessageTypeの場合、このジョブにさらにタスクがあるかどうかを確認します。その後、エンジンキューに送信されます。送信されない場合、このジョブのタスクは完了します(メッセージを送信してクリーンアップします)。

それはすべてうまく機能しているようです。ただし、EngineQueueメッセージを処理する外部アプリ(エンジン)が必要です。そのため、Microsoftの外部アクティベーションメカニズム(ssbeas.exe)を使用しています。長い時間がかかりましたが、ようやく機能するようになりました。メッセージがEngineQueueに入り、EventNotificationEngineQueueがアプリケーションを起動し、キューを排出します。ここまでは順調ですね。ただし、私のアプリは複数回実行されているようです。私のテストアプリケーションは、完了時に電子メールを送信するように構成されています。1つのタスクで1つのジョブのみを送信しますが、複数の電子メールを受け取ります(プログラムが複数回実行されたことを示します)。

これが私のアプリ(vb.net)のコードです(ブローカーはサービスブローカーサービス(送信、受信など)をカプセル化するオブジェクトです):

While True

            oBroker.tran = oBroker.cnn.BeginTransaction

            oBroker.Receive("EMGQueue", msgType, msg, serviceInstance, dialogHandle)

            If dialogHandle = System.Guid.Empty Then
                'Console.WriteLine("An Error Occurred. Program Terminated.")
                oBroker.tran.Commit()
                Exit While
            End If

            ConsoleWriteLine("Received: " & msgType)

            If (msg Is Nothing) Then
                ConsoleWriteLine("commiting and exiting")
                oBroker.tran.Commit()
                Exit While

            Else
                Select Case (msgType)
                    Case "SubmitTask"
                        ProcessMsg(oBroker.cnn, oBroker.tran, msgType, msg, iTaskID, iTaskKey)
                        oBroker.Send(dialogHandle, "<TaskStatus>1</TaskStatus>'")

                    Case "http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog"
                        oBroker.EndDialog(dialogHandle)

                    Case "http://schemas.microsoft.com/SQL/ServiceBroker/Error"""
                        oBroker.EndDialog(dialogHandle)

                End Select
            End If


            ConsoleWriteLine("commiting...")
            oBroker.tran.Commit()

        End While

アプリが複数回実行されている理由はわかりませんが、それを超えると、後続のバージョンがキュー内のメッセージを引き続き表示できる理由がわかりません。結局のところ、最初の化身はキュー内のメッセージをロックする必要がありました。アプリの実行中にメッセージを受信するためにクエリマネージャーを使用してテストできたため、キューがロックされ、ブロックされました。

EAService.configの同時実行性の値を試してみました。min="0"とmax="1"に設定すると、アプリが実行されているように見える回数が2回に減りました。以前は、min="0"とmax="10"を使用して、アプリを実行していました。 18部のようでした。

私はそれが理にかなっていて、長さについて申し訳ないことを願っています。誰かがここで何が起こっているのか考えがありますか?.netアプリのコーディングを間違えましたか?

ありがとうマーティン

編集:エンジンの実行後に作成されるログを追加します。

2010-02-0809:31:39-メイン
2010-02-0809:31:39-受信:SubmitTask
2010-02-08 09:31:39-ProcessMsg
2010-02-08 09 :31: <Task><TaskID> 5</TaskID><TaskKey>2</TaskKey></Task>
39-2010 -02-08 09:31:39-DoWork2010-02-0809:31:39-
メールの送信2010-02-0809: 31:40-コミット
中...
2010-02-0809:31:40-睡眠
2010-02-0809:32:10-睡眠が完了しました。
2010-02-0809:32:10-メイン完了
2010-02-0809:32:10-外部でアクティブ化されたアプリケーションが成功し、終了します。
2010-02-0809:32:10-メイン
2010-02-0809:32:10-受信:SubmitTask
2010-02-08 09:32:10-ProcessMsg
2010-02-08 09 :32: <Task><TaskID> 5</TaskID><TaskKey> 2</TaskKey></Task>
10-2010 -02-08 09:32:10-DoWork2010-02-0809:32:10-
メールの送信
2010-02-0809:32:10-コミットしています...
2010-02-0809:32:10-スリープ
2010-02-0809:32:40-スリープが完了しました。
2010-02-0809:32:40-メイン完了
2010-02-0809:32:40-外部でアクティブ化されたアプリケーションが成功し、終了します。

アプリ全体を2回通過することがわかります(メイン、受信、実行、送信メール、完了)。

編集2:これは、ジョブがキューに送信されたときにアクティブ化されるストアドプロシージャ(debugginステートメントおよびすべて)の最新の化身です。

ALTER PROCEDURE [dbo].[pr_ProcessJob] AS BEGIN

    DECLARE     @message_type_name      sysname
    DECLARE     @dialog             uniqueidentifier 
    DECLARE     @message_sequence_number    bigint
    DECLARE     @error_message_sequence_number  bigint
    DECLARE     @message_body           xml
    DECLARE     @cgid               uniqueidentifier
    DECLARE     @JobID              int
    DECLARE     @Params             varchar(MAX)

    DECLARE     @ErrorNumber            bigint
    DECLARE     @ErrorText          nvarchar(MAX)

    DECLARE         @TaskID             int 
    DECLARE     @TaskService            varchar(100)
    DECLARE     @TaskKey            int
    DECLARE     @chEngine           uniqueidentifier
    DECLARE     @Step               int
    DECLARE     @NextStep           int
    DECLARE     @jobch              uniqueidentifier
    DECLARE     @EngineMsg          XML
    DECLARE     @TimeStarted            datetime
    DECLARE     @TaskStatus         int

    --  This procedure will just sit in a loop processing Task messages in the queue
    --  until the queue is empty
    SET NOCOUNT ON
    SET @error_message_sequence_number = -100


    PRINT 'pr_ProcessJob: Start'

    WHILE (1=1) BEGIN

        BEGIN TRY

            PRINT 'pr_ProcessJob: BEGIN TRANSACTION'
            BEGIN TRANSACTION

            -- first lets get the conversation group id for the next message.
            WAITFOR (
                GET CONVERSATION GROUP @cgid FROM [JobQueue]
            ), TIMEOUT 1000

            IF (@@ROWCOUNT = 0) BEGIN

                PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION (GET CONVERSATION)'
                ROLLBACK TRANSACTION
                BREAK
            END

            PRINT @CGID

            -- Inner Loop (Message Processing)
            WHILE (1=1) BEGIN

                -- Receive the next available message
                PRINT 'Receiving Message.'
                WAITFOR (
                    RECEIVE top(1) -- just handle one message at a time
                        @message_type_name=message_type_name,  --the type of message received
                        @message_body=CAST(message_body AS XML),      -- the message contents
                        @message_sequence_number=message_sequence_number,
                        @dialog = conversation_handle    -- the identifier of the dialog this message was received on
                        FROM [JobQueue]
                        WHERE conversation_group_id=@cgid
                ), TIMEOUT 3000  -- if the queue is empty for three seconds, give up and go away

                -- If we didn't get anything, the queue is empty so bail out

                IF (@@ROWCOUNT = 0) BEGIN               
                    PRINT 'pr_ProcessJob::WaitFor - No messages for conversation group bailing out'
                    BREAK
                END --IF (@@ROWCOUNT = 0)

                PRINT 'Message Received: ' + @message_type_name
                SAVE TRANSACTION MessageReceivedSavePoint

                -- Handle the End Conversation Message
                IF (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog') BEGIN
                    -- When we receive an End Dialog, we need to end also.
                    PRINT 'ENDING CONVERSATION'
                    END CONVERSATION @dialog
                END -- IF (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog') BEGIN
                ELSE BEGIN
                    -- Handle the Conversation Error Message
                    IF (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error') BEGIN
                        -- We can't return anything here because the dialog at the other end is closed so just log 
                        -- an error and close our end of the conversation.

                        PRINT 'ENDING CONVERSATION w/Error'
                        END CONVERSATION @dialog
                    END -- (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error')
                    ELSE BEGIN
                        IF (@message_type_name = 'SubmitJob')  BEGIN -- Process normal Job messages..   

                            PRINT 'pr_ProcessJob:: Message Type SubmitJob received.'

                            -- Pull the information out of the task message with XQuery
                            SELECT @JobID = @message_body.value('(/Job/JobID)[1]', 'int'),
                                @Params = @message_body.value('(/Job/Params)[1]', 'varchar(MAX)')

                            PRINT 'pr_ProcessJob::@JobID = ' + cast(@jobID as varchar(10))
                            PRINT 'pr_ProcessJob::@Params = ' + @Params


                            SELECT  @ErrorNumber = 0, @ErrorText = N''

                            -- Do something with the job
                            -- save state

                            -- we are looking for the first step 
                            SET @Step=1

                            PRINT 'Selecting from JobTask'
                            ---------------------------------------------------------
                            -- Get the next task
                            ---------------------------------------------------------
                            SELECT  TOP 1
                                @TaskID=task.TaskID, 
                                @TaskService=tt.TaskService, 
                                @TaskKey =Task.TaskKey
                            FROM JobTask task INNER JOIN TaskType tt
                                ON task.TaskTypeID = tt.TaskTypeID      
                            WHERE task.jobID=@JobID AND task.enabled=1 and task.step>=@step
                            ORDER BY Task.step
                            ---------------------------------------------------------
                            PRINT 'Selecting from JobTask: complete'

                            PRINT 'Step='+cast(@step as varchar(max))               
                            PRINT 'TaskID='+cast(@TaskID as varchar(max))
                            PRINT 'TaskService='+cast(@TaskService as varchar(max))
                            PRINT'TaskKey='+cast(@TaskKey as varchar(max))                      

                            PRINT 'BEGIN DIALOG with ' + @TaskService   

                            BEGIN DIALOG @chEngine
                                FROM SERVICE [JobService]
                                TO SERVICE @TaskService
                                ON CONTRACT [TaskContract]
                                WITH RELATED_CONVERSATION=@dialog;

                            PRINT 'BEGIN DIALOG with ' + @TaskService+' completed.'

                            SET @EngineMsg = CAST('<Task><TaskID>'+ str(@TaskID)+'</TaskID><TaskKey>'+ str(@Taskkey)+'</TaskKey></Task>' as XML);

                            PRINT CAST(@EngineMsg as varchar(max))

                            PRINT 'Sending Message Type SubmitTask to Engine.';

                            SEND ON CONVERSATION @chEngine
                                MESSAGE TYPE SubmitTask
                                (@EngineMsg)

                            PRINT 'Inserting into jobstate'

                            INSERT INTO JobState(cgid, jobch, jobID, step) VALUES(@cgid, @dialog, @jobid, @step)

                        END -- IF (@message_type_name = 'SubmitJob') 
                        ELSE BEGIN

                            IF (@message_type_name = 'TaskResponse')  BEGIN 

                                PRINT 'Processing MessageType TaskResponse'

                                SELECT @TaskStatus = @message_body.value('(/TaskStatus)[1]', 'int')

                                PRINT 'pr_ProcessJob::@TaskStatus = ' + cast(@TaskStatus as varchar(10))

                                PRINT 'Loading State'

                                --LoadState
                                SELECT  @JobID=jobid, 
                                    @Step=Step, 
                                    @jobch=jobch,
                                    @TimeStarted=sysdate    
                                FROM Jobstate
                                WHERE cgid=@cgid

                                PRINT 'Loading State complete' 

                                PRINT @jobch

                                PRINT 'Selecting from JobTask'
                                ---------------------------------------------------------
                                -- Get the next task
                                ---------------------------------------------------------
                                SELECT  TOP 1
                                    @TaskID=task.TaskID, 
                                    @TaskService=tt.TaskService, 
                                    @TaskKey =task.TaskKey,
                                    @NextStep = task.Step

                                FROM JobTask task INNER JOIN TaskType tt
                                    ON task.TaskTypeID = tt.TaskTypeID      
                                WHERE task.jobID=@JobID AND task.enabled=1 and task.step>@step
                                ORDER BY Task.step
                                ---------------------------------------------------------
                                PRINT 'Selecting from JobTask: complete'

                                PRINT 'NextTask: ['+@TaskService+']'

                                if (@TaskService is null) BEGIN

                                    PRINT '@TaskService is NULL: BEGIN'

                                    -- no more tasks
                                    --END CONVERSATION @jobch

                                    PRINT 'Removing from state table'                               
                                    DELETE FROM JobState 
                                    WHERE @cgid=cgid
                                    PRINT @@ROWCOUNT
                                    PRINT 'Removing from state table-completed'

                                    DECLARE @ResponseDoc xml

                                    -- Send a response message saying we're done
                                    DECLARE @Time nvarchar(100)                 
                                    SET @Time = cast(getdate() as nvarchar(100))

                                    DECLARE @TimeStartedText nvarchar(100)
                                    SET @TimeStartedText = cast(@TimeStarted as nvarchar(100))

                                    SET @ResponseDoc = N'<Job/>'
                                    SET @ResponseDoc.modify(
                                    'insert (<JobID>{ sql:variable("@JobID") }</JobID>, 
                                    <JobStatus>{ sql:variable("@ErrorNumber") }</JobStatus>,
                                    <ErrorNumber>{ sql:variable("@ErrorNumber") }</ErrorNumber>,
                                    <ErrorText>{ sql:variable("@ErrorText") }</ErrorText>,
                                    <TimeStarted>{ sql:variable("@TimeStartedText") }</TimeStarted>, 
                                    <TimeCompleted>{ sql:variable("@Time") }</TimeCompleted>) 
                                    as last into /Job [1]'); 

                                    SEND ON CONVERSATION @jobch 
                                        MESSAGE TYPE [JobResponse] (@ResponseDoc)

                                    END CONVERSATION @jobch             

                                    PRINT '@TaskService is NULL: END'

                                END --if (@TaskService is null) BEGIN
                                ELSE BEGIN
                                    -- there are more tasks 
                                    PRINT '@TaskService is not null: BEGIN'

                                    PRINT 'BEGIN DIALOG with ' + @TaskService

                                    --another task
                                    BEGIN DIALOG @chEngine
                                        FROM SERVICE [JobService]
                                        TO SERVICE @TaskService
                                        ON CONTRACT [TaskContract]
                                        WITH RELATED_CONVERSATION=@dialog;

                                    SET @EngineMsg = CAST('<Task><TaskID>'+ str(@TaskID)+'</TaskID><TaskKey>'+ str(@Taskkey)+'</TaskKey></Task>' as XML);

                                    PRINT 'SEND ' +cast(@EngineMsg as varchar(max));

                                    SEND ON CONVERSATION @chEngine
                                    MESSAGE TYPE SubmitTask (@EngineMsg)                                    

                                    PRINT 'SAVING State: ' +str(@step)

                                    -- save state
                                    Update JobState
                                        SET step = @NextStep
                                    FROM JobState
                                    WHERE cgid=@cgid

                                    PRINT '@TaskService is not null: END'

                                END -- ELSE (@TaskService is NOT NULL)

                                PRINT 'Processing MessageType TaskResponse...Complete'

                            END -- IF (@message_type_name = 'TaskCompleted')

                        END -- ELSE IF (@message_type_name <> 'JobRequest') 
                    END -- ELSE  (@message_type_name <> 'http://schemas.microsoft.com/SQL/ServiceBroker/Error')
                END -- ELSE (@message_type_name <> 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
            END -- WHILE (1=1) BEGIN

            PRINT 'COMMIT TRANSACTION'
            COMMIT TRANSACTION

        END TRY
        BEGIN CATCH

            --rollback transaction

            DECLARE @ErrNum int
            DECLARE @ErrMsg varchar(max)


                SELECT
                    ERROR_NUMBER() AS ErrorNumber
                    ,ERROR_SEVERITY() AS ErrorSeverity
                    ,ERROR_STATE() AS ErrorState
                    ,ERROR_PROCEDURE() AS ErrorProcedure
                    ,ERROR_LINE() AS ErrorLine
                    ,ERROR_MESSAGE() AS ErrorMessage;

            PRINT 'pr_ProcessJob: ROLLBACK (CATCH)'


            if (error_number()=1205) BEGIN
                -- a deadlock occurred. We can try it again.
                PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION (CATCH)'
                ROLLBACK TRANSACTION
                --CONTINUE
            END --if (error_number()=1205)
            ELSE BEGIN
                if (error_number()=9617) BEGIN
                    PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION (CATCH)'
                    ROLLBACK TRANSACTION
                END
                ELSE BEGIN -- (error_number()<>9617) 
                    -- another error occurred.  The message cant be procesed sucessfully
                    PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION to MessageReceivedSavePoint (CATCH)'
                    ROLLBACK TRANSACTION MessageReceivedSavePoint

                END --ELSE (error_number()<>9617)       
            END -- if (error_number()<>1205)
        END CATCH

    END -- while loop

PRINT 'pr_ProcessJob: Complete' 

END -- CREATE PROCEDURE [dbo].[ProcessJobProc]
4

1 に答える 1

1

アプリの後続のインスタンスがメッセージを表示できる場合、それは1つのことだけを意味します。つまり、前のインスタンスが受信をロールバックしている必要があります。一見、提供したコードは問題ないように見えるので、使用しているオブジェクトモデルのエラーを探します。そのメソッドの1つが例外をスローした場合、アプリは終了し、メッセージを受信したトランザクションは自動的にロールバックされます。

アプリのインスタンスを一度に1つだけ実行する場合は、[最大]設定を1のままにします。そうしないと、負荷に対応するために、デフォルトでより多くのインスタンスが同時に実行されます。

于 2010-02-09T19:07:44.603 に答える