私は、ジョブをスケジュールして実行するためのメッセージングシステムとしてサービスブローカーを使用しています。Eashジョブは、エンジンと呼ばれる複数のタスクまたはステップで構成されます。
私のサービスブローカーオブジェクトは次のとおりです。
- メッセージタイプ:SubmitJob、JobResponse、SubmitTask、TaskResponse
- 契約:JobContract、TaskContract
- キュー:ClientQueue、JobQueue、EngineQueue、ExternalActivatorQueue
- サービス:ClientService、JobService、EngineService、ExternalActivatorService
- イベント通知:EventNotificationEngineQueue
ジョブキューに内部アクティベーション(ストアドプロシージャ)があります。SubmitJob MessageTypesの場合、ストアドプロシージャはそのジョブの最初のタスクを取得し、EngineServiceとのダイアログを開始し、そのキューにメッセージを送信します(StartTask)TaskResponses MessageTypeの場合、このジョブにさらにタスクがあるかどうかを確認します。その後、エンジンキューに送信されます。送信されない場合、このジョブのタスクは完了します(メッセージを送信してクリーンアップします)。
それはすべてうまく機能しているようです。ただし、EngineQueueメッセージを処理する外部アプリ(エンジン)が必要です。そのため、Microsoftの外部アクティベーションメカニズム(ssbeas.exe)を使用しています。長い時間がかかりましたが、ようやく機能するようになりました。メッセージがEngineQueueに入り、EventNotificationEngineQueueがアプリケーションを起動し、キューを排出します。ここまでは順調ですね。ただし、私のアプリは複数回実行されているようです。私のテストアプリケーションは、完了時に電子メールを送信するように構成されています。1つのタスクで1つのジョブのみを送信しますが、複数の電子メールを受け取ります(プログラムが複数回実行されたことを示します)。
これが私のアプリ(vb.net)のコードです(ブローカーはサービスブローカーサービス(送信、受信など)をカプセル化するオブジェクトです):
While True
oBroker.tran = oBroker.cnn.BeginTransaction
oBroker.Receive("EMGQueue", msgType, msg, serviceInstance, dialogHandle)
If dialogHandle = System.Guid.Empty Then
'Console.WriteLine("An Error Occurred. Program Terminated.")
oBroker.tran.Commit()
Exit While
End If
ConsoleWriteLine("Received: " & msgType)
If (msg Is Nothing) Then
ConsoleWriteLine("commiting and exiting")
oBroker.tran.Commit()
Exit While
Else
Select Case (msgType)
Case "SubmitTask"
ProcessMsg(oBroker.cnn, oBroker.tran, msgType, msg, iTaskID, iTaskKey)
oBroker.Send(dialogHandle, "<TaskStatus>1</TaskStatus>'")
Case "http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog"
oBroker.EndDialog(dialogHandle)
Case "http://schemas.microsoft.com/SQL/ServiceBroker/Error"""
oBroker.EndDialog(dialogHandle)
End Select
End If
ConsoleWriteLine("commiting...")
oBroker.tran.Commit()
End While
アプリが複数回実行されている理由はわかりませんが、それを超えると、後続のバージョンがキュー内のメッセージを引き続き表示できる理由がわかりません。結局のところ、最初の化身はキュー内のメッセージをロックする必要がありました。アプリの実行中にメッセージを受信するためにクエリマネージャーを使用してテストできたため、キューがロックされ、ブロックされました。
EAService.configの同時実行性の値を試してみました。min="0"とmax="1"に設定すると、アプリが実行されているように見える回数が2回に減りました。以前は、min="0"とmax="10"を使用して、アプリを実行していました。 18部のようでした。
私はそれが理にかなっていて、長さについて申し訳ないことを願っています。誰かがここで何が起こっているのか考えがありますか?.netアプリのコーディングを間違えましたか?
ありがとうマーティン
編集:エンジンの実行後に作成されるログを追加します。
2010-02-0809:31:39-メイン
2010-02-0809:31:39-受信:SubmitTask
2010-02-08 09:31:39-ProcessMsg
2010-02-08 09 :31: <Task><TaskID> 5</TaskID><TaskKey>2</TaskKey></Task>
39-2010 -02-08 09:31:39-DoWork2010-02-0809:31:39-
メールの送信2010-02-0809: 31:40-コミット
中...
2010-02-0809:31:40-睡眠
2010-02-0809:32:10-睡眠が完了しました。
2010-02-0809:32:10-メイン完了
2010-02-0809:32:10-外部でアクティブ化されたアプリケーションが成功し、終了します。
2010-02-0809:32:10-メイン
2010-02-0809:32:10-受信:SubmitTask
2010-02-08 09:32:10-ProcessMsg
2010-02-08 09 :32: <Task><TaskID> 5</TaskID><TaskKey> 2</TaskKey></Task>
10-2010 -02-08 09:32:10-DoWork2010-02-0809:32:10-
メールの送信
2010-02-0809:32:10-コミットしています...
2010-02-0809:32:10-スリープ
2010-02-0809:32:40-スリープが完了しました。
2010-02-0809:32:40-メイン完了
2010-02-0809:32:40-外部でアクティブ化されたアプリケーションが成功し、終了します。
アプリ全体を2回通過することがわかります(メイン、受信、実行、送信メール、完了)。
編集2:これは、ジョブがキューに送信されたときにアクティブ化されるストアドプロシージャ(debugginステートメントおよびすべて)の最新の化身です。
ALTER PROCEDURE [dbo].[pr_ProcessJob] AS BEGIN
DECLARE @message_type_name sysname
DECLARE @dialog uniqueidentifier
DECLARE @message_sequence_number bigint
DECLARE @error_message_sequence_number bigint
DECLARE @message_body xml
DECLARE @cgid uniqueidentifier
DECLARE @JobID int
DECLARE @Params varchar(MAX)
DECLARE @ErrorNumber bigint
DECLARE @ErrorText nvarchar(MAX)
DECLARE @TaskID int
DECLARE @TaskService varchar(100)
DECLARE @TaskKey int
DECLARE @chEngine uniqueidentifier
DECLARE @Step int
DECLARE @NextStep int
DECLARE @jobch uniqueidentifier
DECLARE @EngineMsg XML
DECLARE @TimeStarted datetime
DECLARE @TaskStatus int
-- This procedure will just sit in a loop processing Task messages in the queue
-- until the queue is empty
SET NOCOUNT ON
SET @error_message_sequence_number = -100
PRINT 'pr_ProcessJob: Start'
WHILE (1=1) BEGIN
BEGIN TRY
PRINT 'pr_ProcessJob: BEGIN TRANSACTION'
BEGIN TRANSACTION
-- first lets get the conversation group id for the next message.
WAITFOR (
GET CONVERSATION GROUP @cgid FROM [JobQueue]
), TIMEOUT 1000
IF (@@ROWCOUNT = 0) BEGIN
PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION (GET CONVERSATION)'
ROLLBACK TRANSACTION
BREAK
END
PRINT @CGID
-- Inner Loop (Message Processing)
WHILE (1=1) BEGIN
-- Receive the next available message
PRINT 'Receiving Message.'
WAITFOR (
RECEIVE top(1) -- just handle one message at a time
@message_type_name=message_type_name, --the type of message received
@message_body=CAST(message_body AS XML), -- the message contents
@message_sequence_number=message_sequence_number,
@dialog = conversation_handle -- the identifier of the dialog this message was received on
FROM [JobQueue]
WHERE conversation_group_id=@cgid
), TIMEOUT 3000 -- if the queue is empty for three seconds, give up and go away
-- If we didn't get anything, the queue is empty so bail out
IF (@@ROWCOUNT = 0) BEGIN
PRINT 'pr_ProcessJob::WaitFor - No messages for conversation group bailing out'
BREAK
END --IF (@@ROWCOUNT = 0)
PRINT 'Message Received: ' + @message_type_name
SAVE TRANSACTION MessageReceivedSavePoint
-- Handle the End Conversation Message
IF (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog') BEGIN
-- When we receive an End Dialog, we need to end also.
PRINT 'ENDING CONVERSATION'
END CONVERSATION @dialog
END -- IF (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog') BEGIN
ELSE BEGIN
-- Handle the Conversation Error Message
IF (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error') BEGIN
-- We can't return anything here because the dialog at the other end is closed so just log
-- an error and close our end of the conversation.
PRINT 'ENDING CONVERSATION w/Error'
END CONVERSATION @dialog
END -- (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error')
ELSE BEGIN
IF (@message_type_name = 'SubmitJob') BEGIN -- Process normal Job messages..
PRINT 'pr_ProcessJob:: Message Type SubmitJob received.'
-- Pull the information out of the task message with XQuery
SELECT @JobID = @message_body.value('(/Job/JobID)[1]', 'int'),
@Params = @message_body.value('(/Job/Params)[1]', 'varchar(MAX)')
PRINT 'pr_ProcessJob::@JobID = ' + cast(@jobID as varchar(10))
PRINT 'pr_ProcessJob::@Params = ' + @Params
SELECT @ErrorNumber = 0, @ErrorText = N''
-- Do something with the job
-- save state
-- we are looking for the first step
SET @Step=1
PRINT 'Selecting from JobTask'
---------------------------------------------------------
-- Get the next task
---------------------------------------------------------
SELECT TOP 1
@TaskID=task.TaskID,
@TaskService=tt.TaskService,
@TaskKey =Task.TaskKey
FROM JobTask task INNER JOIN TaskType tt
ON task.TaskTypeID = tt.TaskTypeID
WHERE task.jobID=@JobID AND task.enabled=1 and task.step>=@step
ORDER BY Task.step
---------------------------------------------------------
PRINT 'Selecting from JobTask: complete'
PRINT 'Step='+cast(@step as varchar(max))
PRINT 'TaskID='+cast(@TaskID as varchar(max))
PRINT 'TaskService='+cast(@TaskService as varchar(max))
PRINT'TaskKey='+cast(@TaskKey as varchar(max))
PRINT 'BEGIN DIALOG with ' + @TaskService
BEGIN DIALOG @chEngine
FROM SERVICE [JobService]
TO SERVICE @TaskService
ON CONTRACT [TaskContract]
WITH RELATED_CONVERSATION=@dialog;
PRINT 'BEGIN DIALOG with ' + @TaskService+' completed.'
SET @EngineMsg = CAST('<Task><TaskID>'+ str(@TaskID)+'</TaskID><TaskKey>'+ str(@Taskkey)+'</TaskKey></Task>' as XML);
PRINT CAST(@EngineMsg as varchar(max))
PRINT 'Sending Message Type SubmitTask to Engine.';
SEND ON CONVERSATION @chEngine
MESSAGE TYPE SubmitTask
(@EngineMsg)
PRINT 'Inserting into jobstate'
INSERT INTO JobState(cgid, jobch, jobID, step) VALUES(@cgid, @dialog, @jobid, @step)
END -- IF (@message_type_name = 'SubmitJob')
ELSE BEGIN
IF (@message_type_name = 'TaskResponse') BEGIN
PRINT 'Processing MessageType TaskResponse'
SELECT @TaskStatus = @message_body.value('(/TaskStatus)[1]', 'int')
PRINT 'pr_ProcessJob::@TaskStatus = ' + cast(@TaskStatus as varchar(10))
PRINT 'Loading State'
--LoadState
SELECT @JobID=jobid,
@Step=Step,
@jobch=jobch,
@TimeStarted=sysdate
FROM Jobstate
WHERE cgid=@cgid
PRINT 'Loading State complete'
PRINT @jobch
PRINT 'Selecting from JobTask'
---------------------------------------------------------
-- Get the next task
---------------------------------------------------------
SELECT TOP 1
@TaskID=task.TaskID,
@TaskService=tt.TaskService,
@TaskKey =task.TaskKey,
@NextStep = task.Step
FROM JobTask task INNER JOIN TaskType tt
ON task.TaskTypeID = tt.TaskTypeID
WHERE task.jobID=@JobID AND task.enabled=1 and task.step>@step
ORDER BY Task.step
---------------------------------------------------------
PRINT 'Selecting from JobTask: complete'
PRINT 'NextTask: ['+@TaskService+']'
if (@TaskService is null) BEGIN
PRINT '@TaskService is NULL: BEGIN'
-- no more tasks
--END CONVERSATION @jobch
PRINT 'Removing from state table'
DELETE FROM JobState
WHERE @cgid=cgid
PRINT @@ROWCOUNT
PRINT 'Removing from state table-completed'
DECLARE @ResponseDoc xml
-- Send a response message saying we're done
DECLARE @Time nvarchar(100)
SET @Time = cast(getdate() as nvarchar(100))
DECLARE @TimeStartedText nvarchar(100)
SET @TimeStartedText = cast(@TimeStarted as nvarchar(100))
SET @ResponseDoc = N'<Job/>'
SET @ResponseDoc.modify(
'insert (<JobID>{ sql:variable("@JobID") }</JobID>,
<JobStatus>{ sql:variable("@ErrorNumber") }</JobStatus>,
<ErrorNumber>{ sql:variable("@ErrorNumber") }</ErrorNumber>,
<ErrorText>{ sql:variable("@ErrorText") }</ErrorText>,
<TimeStarted>{ sql:variable("@TimeStartedText") }</TimeStarted>,
<TimeCompleted>{ sql:variable("@Time") }</TimeCompleted>)
as last into /Job [1]');
SEND ON CONVERSATION @jobch
MESSAGE TYPE [JobResponse] (@ResponseDoc)
END CONVERSATION @jobch
PRINT '@TaskService is NULL: END'
END --if (@TaskService is null) BEGIN
ELSE BEGIN
-- there are more tasks
PRINT '@TaskService is not null: BEGIN'
PRINT 'BEGIN DIALOG with ' + @TaskService
--another task
BEGIN DIALOG @chEngine
FROM SERVICE [JobService]
TO SERVICE @TaskService
ON CONTRACT [TaskContract]
WITH RELATED_CONVERSATION=@dialog;
SET @EngineMsg = CAST('<Task><TaskID>'+ str(@TaskID)+'</TaskID><TaskKey>'+ str(@Taskkey)+'</TaskKey></Task>' as XML);
PRINT 'SEND ' +cast(@EngineMsg as varchar(max));
SEND ON CONVERSATION @chEngine
MESSAGE TYPE SubmitTask (@EngineMsg)
PRINT 'SAVING State: ' +str(@step)
-- save state
Update JobState
SET step = @NextStep
FROM JobState
WHERE cgid=@cgid
PRINT '@TaskService is not null: END'
END -- ELSE (@TaskService is NOT NULL)
PRINT 'Processing MessageType TaskResponse...Complete'
END -- IF (@message_type_name = 'TaskCompleted')
END -- ELSE IF (@message_type_name <> 'JobRequest')
END -- ELSE (@message_type_name <> 'http://schemas.microsoft.com/SQL/ServiceBroker/Error')
END -- ELSE (@message_type_name <> 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
END -- WHILE (1=1) BEGIN
PRINT 'COMMIT TRANSACTION'
COMMIT TRANSACTION
END TRY
BEGIN CATCH
--rollback transaction
DECLARE @ErrNum int
DECLARE @ErrMsg varchar(max)
SELECT
ERROR_NUMBER() AS ErrorNumber
,ERROR_SEVERITY() AS ErrorSeverity
,ERROR_STATE() AS ErrorState
,ERROR_PROCEDURE() AS ErrorProcedure
,ERROR_LINE() AS ErrorLine
,ERROR_MESSAGE() AS ErrorMessage;
PRINT 'pr_ProcessJob: ROLLBACK (CATCH)'
if (error_number()=1205) BEGIN
-- a deadlock occurred. We can try it again.
PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION (CATCH)'
ROLLBACK TRANSACTION
--CONTINUE
END --if (error_number()=1205)
ELSE BEGIN
if (error_number()=9617) BEGIN
PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION (CATCH)'
ROLLBACK TRANSACTION
END
ELSE BEGIN -- (error_number()<>9617)
-- another error occurred. The message cant be procesed sucessfully
PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION to MessageReceivedSavePoint (CATCH)'
ROLLBACK TRANSACTION MessageReceivedSavePoint
END --ELSE (error_number()<>9617)
END -- if (error_number()<>1205)
END CATCH
END -- while loop
PRINT 'pr_ProcessJob: Complete'
END -- CREATE PROCEDURE [dbo].[ProcessJobProc]