序章
Oracle Advanced Queuing メソッドを使用すると、Oracle SQL Server (正確には、Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64bit Production) で非常に奇妙な動作に直面しています。
問題
エラーは、X個のメッセージをキューに入れたことですが、dequeue_array はX+1個のメッセージを返し、最初のメッセージが重複しています (MessageId で確認できます)。
再現:
エラーを再現するための簡単な PoC を書くことができました。このコードは非常に単純です。エンキュー/デキューは標準の Oracle AQ です。このコードは、次の手順を 2 回実行します (テスト実行)。
- キュー テーブルをパージする
- X メッセージをキューに入れる
- dbms_aq.dequeue_array 呼び出しを使用してすべてのメッセージをデキューします
- デキューされたメッセージの数を確認する
新しい接続で POC を実行すると、最初の実行はエラーなしで成功しますが、その後の実行はすべて失敗します。その後、同じ接続を使用すると、スクリプトを実行するたびに、両方のテスト実行で失敗します。
私がこれまでに試したこと:
- 「新しい」接続でこのスクリプトを実行する: 最初の実行のみが失敗する
- 同じ接続でのスクリプトの追加実行: すべての実行が失敗する
- 新しいキューを使用/作成する場合: 最初の実行のみが失敗する
- dbms_aqadm.purge_queue_table() の代わりに「delete from <queue_table>」を使用する場合: すべて問題ありません
- 「通常の」1 つずつデキューを使用する場合: すべて問題ありません
結論:
この動作を説明することも、コードにエラーを見つけることもできません。ご覧ください。お気に入りの SQL クライアントで直接実行できるはずです (PL/SQL Developer でテスト済み)。
さらに詳しい情報が必要な場合や、PoC を機能させるのに問題がある場合は、質問してください。このスレッドを定期的にチェックします。何が起こっているかについての詳細な出力を含め、PoC をできるだけ読みやすいものにしようとしました。
コード:
declare
C_QueueName constant varchar2(32767) := 'TEST_QUEUE';
C_QueueTable constant varchar2(32767) := 'TEST_Q_TABLE';
C_MsgCount constant pls_integer := 1;
C_TestRuns constant pls_integer := 2;
C_DequeueArraySize constant pls_integer := 10;
/*
* Create the queue and the queue table used for theses tests
*/
procedure CreateQueueIfMissing is
L_Present pls_integer;
begin
dbms_output.put_line('START CreateQueueIfMissing');
execute immediate 'select count(*) from USER_OBJECTS where OBJECT_NAME = ''' || C_QueueName || ''' and OBJECT_TYPE = ''QUEUE''' into L_Present;
if L_Present = 1 then
dbms_output.put_line('Skipping queue creation, already present.');
dbms_output.put_line('END CreateQueueIfMissing');
return;
end if;
dbms_output.put_line(' Creating queue table ' || C_QueueTable);
DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => C_QueueTable
,storage_clause => 'LOGGING NOCACHE NOPARALLEL MONITORING'
,sort_list => 'priority,enq_time'
,multiple_consumers => false
,queue_payload_type => 'SYS.AQ$_JMS_BYTES_MESSAGE'
,comment => 'Queue for messages');
dbms_output.put_line(' Creating queue ' || C_QueueName);
DBMS_AQADM.CREATE_QUEUE(queue_name => C_QueueName
,queue_table => C_QueueTable
,max_retries => 8640
,retry_delay => 30
,comment => 'Queue for messages');
dbms_output.put_line(' Starting queue ' || C_QueueName);
DBMS_AQADM.START_QUEUE(queue_name => C_QueueName);
dbms_output.put_line('END CreateQueueIfMissing');
end CreateQueueIfMissing;
-- ================================================================================================
/*
* This procedure is the root of all evil.
* The error only occurs when using the purge_queue_tables procedure.
* When using a normal "delete from <queue_table>" then everything is just fine.
*/
procedure CleanQueueTable is
L_PurgeOptions dbms_aqadm.aq$_purge_options_t;
L_Count pls_integer;
begin
dbms_output.put_line('START CleanQueueTable');
execute immediate 'select count(*) from ' || C_QueueTable into L_Count;
dbms_output.put_line(' Messages in queue table BEFORE purge: ' || L_Count);
dbms_aqadm.purge_queue_table(queue_table => C_QueueTable
,purge_condition => null
,purge_options => L_PurgeOptions);
execute immediate 'select count(*) from ' || C_QueueTable into L_Count;
dbms_output.put_line(' Messages in queue table AFTER purge: ' || L_Count);
dbms_output.put_line('END CleanQueueTable');
end CleanQueueTable;
-- ================================================================================================
/*
* Enqueue the configured count of messages on the queue
*/
procedure EnqueueMessages is
L_BodyId pls_integer;
L_Msg sys.aq$_jms_bytes_message;
L_MsgId raw(16);
L_Count pls_integer;
L_EnqueueOptions DBMS_AQ.ENQUEUE_OPTIONS_T;
L_MessageProperties DBMS_AQ.MESSAGE_PROPERTIES_T;
begin
dbms_output.put_line('START EnqueueMessages');
execute immediate 'select count(*) from ' || C_QueueTable into L_Count;
dbms_output.put_line(' Messages in queue table BEFORE enqueue: ' || L_Count);
for i in 1 .. C_MsgCount
loop
dbms_output.put_line(' Construct #' || i);
L_Msg := sys.aq$_jms_bytes_message.construct;
-- set the JMS header
L_Msg.set_type('JmsBytesMessage');
L_Msg.set_userid(1);
L_Msg.set_appid('test');
L_Msg.set_groupid('cs');
L_Msg.set_groupseq(1);
-- set JMS message content
L_BodyId := L_Msg.clear_body(-1);
L_Msg.write_bytes(L_BodyId, to_blob(utl_raw.cast_to_raw('<test>Lorem Ipsum</test>')));
L_Msg.flush(L_BodyId);
L_Msg.clean(L_BodyId);
dbms_output.put_line(' Enqueue #' || i);
DBMS_AQ.ENQUEUE (queue_name => C_QueueName
,enqueue_options => L_EnqueueOptions
,message_properties => L_MessageProperties
,payload => L_Msg
,msgid => L_MsgId);
end loop;
execute immediate 'select count(*) from ' || C_QueueTable into L_Count;
dbms_output.put_line(' Messages in queue table AFTER enqueue: ' || L_Count);
dbms_output.put_line('END EnqueueMessages');
end EnqueueMessages;
-- ================================================================================================
/*
* Dequeues messages using dequeue_array from the configured queue.
*/
procedure DequeueMessages is
L_DequeueOptions dbms_aq.dequeue_options_t;
L_MsgPropArr dbms_aq.message_properties_array_t := dbms_aq.message_properties_array_t();
L_PayloadArr sys.aq$_jms_bytes_messages;
L_MsgIdArr dbms_aq.msgid_array_t;
L_MsgCnt pls_integer := 0;
L_Count pls_integer;
begin
dbms_output.put_line('START DequeueMessages');
execute immediate 'select count(*) from ' || C_QueueTable into L_Count;
dbms_output.put_line(' Messages in queue table BEFORE dequeue: ' || L_Count);
L_MsgCnt := dbms_aq.dequeue_array(queue_name => C_QueueName
,dequeue_options => L_DequeueOptions
,array_size => C_DequeueArraySize
,message_properties_array => L_MsgPropArr
,payload_array => L_PayloadArr
,msgid_array => L_MsgIdArr);
execute immediate 'select count(*) from ' || C_QueueTable into L_Count;
dbms_output.put_line(' Messages in queue table AFTER dequeue: ' || L_Count);
dbms_output.put_line(' Expected: ' || C_MsgCount || ', Received: ' || L_MsgCnt);
if C_MsgCount != L_MsgCnt then
dbms_output.put_line(' *****************************************');
dbms_output.put_line(' TOO MANY ITEMS DEQUEUED?!?');
dbms_output.put_line(' *****************************************');
for i in 1 .. L_MsgCnt
loop
dbms_output.put_line(' #' || i || ' MsdId=' || L_MsgIdArr(i));
end loop;
end if;
dbms_output.put_line('END DequeueMessages');
end DequeueMessages;
-- ================================================================================================
/*
* This is the testcase
*/
procedure RunTestCase is
begin
CreateQueueIfMissing;
for i in 1 .. C_TestRuns
loop
dbms_output.put_line(null);
dbms_output.put_line('=========== START test run #' || i || '===========');
CleanQueueTable;
EnqueueMessages;
DequeueMessages;
end loop;
end;
-- ================================================================================================
begin
RunTestCase;
end;
出力例:
START CreateQueueIfMissing
Skipping queue creation, already present.
END CreateQueueIfMissing
=========== START test run #1===========
START CleanQueueTable
Messages in queue table BEFORE purge: 0
Messages in queue table AFTER purge: 0
END CleanQueueTable
START EnqueueMessages
Messages in queue table BEFORE enqueue: 0
Construct #1
Enqueue #1
Messages in queue table AFTER enqueue: 1
END EnqueueMessages
START DequeueMessages
Messages in queue table BEFORE dequeue: 1
Messages in queue table AFTER dequeue: 0
Expected: 1, Received: 1
END DequeueMessages
=========== START test run #2===========
START CleanQueueTable
Messages in queue table BEFORE purge: 0
Messages in queue table AFTER purge: 0
END CleanQueueTable
START EnqueueMessages
Messages in queue table BEFORE enqueue: 0
Construct #1
Enqueue #1
Messages in queue table AFTER enqueue: 1
END EnqueueMessages
START DequeueMessages
Messages in queue table BEFORE dequeue: 1
Messages in queue table AFTER dequeue: 0
Expected: 1, Received: 2
*****************************************
TOO MANY ITEMS DEQUEUED?!?
*****************************************
#1 MsdId=2949A0FF2EE456A7E0540010E0467A30
#2 MsdId=2949A0FF2EE456A7E0540010E0467A30
END DequeueMessages