bratkartoffel
bratkartoffel

Reputation: 1177

dbms_aq.dequeue_array, first message is returned twice

Introduction

I'm facing a very strange behaviour on my Oracle SQL Server (exactly: Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64bit Production) when using the Oracle Advanced Queueing methods.

The problem

The error is that I enqueue X messages, but the dequeue_array returns X+1 messages , with the first message beeing duplicated (as seen by the MessageId).

Reproduce:

I was able to write some simple PoC to reproduce the error. This Code is pretty straight-forward, the enqueue / dequeue stuff is standard Oracle AQ. The code does the following steps for two times (test runs):

When running the POC on a fresh connection, the first run succeeds without an error but every following run fails. After that, when using the same connection, everytime you execute the script it will fail in both test runs.

What I tried so far:

Conclusion:

I can neither explain this behaviour, nor find an error in my code. Please take a look at it, it should be directly executable in your favorite sql client (tested with PL/SQL Developer).

If you need any further information or have problems getting the PoC to work, just ask, I'll check this thread regularly. I've tried to make the PoC as readable as possible, including a verbose output about what is happening.

Code:

declare
   C_QueueName        constant varchar2(32767) := 'TEST_QUEUE';
   C_QueueTable       constant varchar2(32767) := 'TEST_Q_TABLE';
   C_MsgCount         constant pls_integer := 1;
   C_TestRuns         constant pls_integer := 2;
   C_DequeueArraySize constant pls_integer := 10;

   /*
    * Create the queue and the queue table used for theses tests
   */
   procedure CreateQueueIfMissing is
      L_Present pls_integer;
   begin
      dbms_output.put_line('START CreateQueueIfMissing');

      execute immediate 'select count(*) from USER_OBJECTS where OBJECT_NAME = ''' || C_QueueName || ''' and OBJECT_TYPE = ''QUEUE''' into L_Present;
      if L_Present = 1 then
         dbms_output.put_line('Skipping queue creation, already present.');
         dbms_output.put_line('END CreateQueueIfMissing');
         return;
      end if;

      dbms_output.put_line('  Creating queue table ' || C_QueueTable);
      DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => C_QueueTable
         ,storage_clause     => 'LOGGING NOCACHE NOPARALLEL MONITORING'
         ,sort_list          => 'priority,enq_time'
         ,multiple_consumers => false
         ,queue_payload_type => 'SYS.AQ$_JMS_BYTES_MESSAGE'
         ,comment            => 'Queue for messages');

      dbms_output.put_line('  Creating queue ' || C_QueueName);
      DBMS_AQADM.CREATE_QUEUE(queue_name => C_QueueName
         ,queue_table => C_QueueTable
         ,max_retries => 8640
         ,retry_delay => 30
         ,comment => 'Queue for messages');

      dbms_output.put_line('  Starting queue ' || C_QueueName);
      DBMS_AQADM.START_QUEUE(queue_name => C_QueueName);
      dbms_output.put_line('END CreateQueueIfMissing');
   end CreateQueueIfMissing;
   -- ================================================================================================


   /*
    * This procedure is the root of all evil.
    * The error only occurs when using the purge_queue_tables procedure.
    * When using a normal "delete from <queue_table>" then everything is just fine.
   */
   procedure CleanQueueTable is
      L_PurgeOptions dbms_aqadm.aq$_purge_options_t;
      L_Count        pls_integer;
   begin
      dbms_output.put_line('START CleanQueueTable');

      execute immediate 'select count(*) from ' ||  C_QueueTable into L_Count;
      dbms_output.put_line('  Messages in queue table BEFORE purge: ' || L_Count);

      dbms_aqadm.purge_queue_table(queue_table => C_QueueTable
         ,purge_condition => null
         ,purge_options   => L_PurgeOptions);

      execute immediate 'select count(*) from ' ||  C_QueueTable into L_Count;
      dbms_output.put_line('  Messages in queue table AFTER purge: ' || L_Count);

      dbms_output.put_line('END CleanQueueTable');
   end CleanQueueTable;
   -- ================================================================================================


   /*
    * Enqueue the configured count of messages on the queue
   */
   procedure EnqueueMessages is
      L_BodyId  pls_integer;
      L_Msg     sys.aq$_jms_bytes_message;
      L_MsgId   raw(16);
      L_Count   pls_integer;

      L_EnqueueOptions    DBMS_AQ.ENQUEUE_OPTIONS_T;
      L_MessageProperties DBMS_AQ.MESSAGE_PROPERTIES_T;
   begin
      dbms_output.put_line('START EnqueueMessages');

      execute immediate 'select count(*) from ' ||  C_QueueTable into L_Count;
      dbms_output.put_line('  Messages in queue table BEFORE enqueue: ' || L_Count);

      for i in 1 .. C_MsgCount
      loop
         dbms_output.put_line('    Construct #' || i);
         L_Msg := sys.aq$_jms_bytes_message.construct;

         -- set the JMS header
         L_Msg.set_type('JmsBytesMessage');
         L_Msg.set_userid(1);
         L_Msg.set_appid('test');
         L_Msg.set_groupid('cs');
         L_Msg.set_groupseq(1);

         -- set JMS message content
         L_BodyId := L_Msg.clear_body(-1);
         L_Msg.write_bytes(L_BodyId, to_blob(utl_raw.cast_to_raw('<test>Lorem Ipsum</test>')));
         L_Msg.flush(L_BodyId);
         L_Msg.clean(L_BodyId);

         dbms_output.put_line('    Enqueue #' || i);
         DBMS_AQ.ENQUEUE (queue_name => C_QueueName
            ,enqueue_options    => L_EnqueueOptions
            ,message_properties => L_MessageProperties
            ,payload            => L_Msg
            ,msgid              => L_MsgId);
      end loop;

      execute immediate 'select count(*) from ' ||  C_QueueTable into L_Count;
      dbms_output.put_line('  Messages in queue table AFTER enqueue: ' || L_Count);
      dbms_output.put_line('END EnqueueMessages');
   end EnqueueMessages;
   -- ================================================================================================


   /*
    * Dequeues messages using dequeue_array from the configured queue.
   */
   procedure DequeueMessages is
      L_DequeueOptions dbms_aq.dequeue_options_t;
      L_MsgPropArr     dbms_aq.message_properties_array_t := dbms_aq.message_properties_array_t();
      L_PayloadArr     sys.aq$_jms_bytes_messages;
      L_MsgIdArr       dbms_aq.msgid_array_t;

      L_MsgCnt         pls_integer := 0;
      L_Count          pls_integer;
   begin
      dbms_output.put_line('START DequeueMessages');

      execute immediate 'select count(*) from ' ||  C_QueueTable into L_Count;
      dbms_output.put_line('  Messages in queue table BEFORE dequeue: ' || L_Count);

      L_MsgCnt := dbms_aq.dequeue_array(queue_name => C_QueueName
         ,dequeue_options          => L_DequeueOptions
         ,array_size               => C_DequeueArraySize
         ,message_properties_array => L_MsgPropArr
         ,payload_array            => L_PayloadArr
         ,msgid_array              => L_MsgIdArr);

      execute immediate 'select count(*) from ' ||  C_QueueTable into L_Count;
      dbms_output.put_line('  Messages in queue table AFTER dequeue: ' || L_Count);

      dbms_output.put_line('  Expected: ' || C_MsgCount || ', Received: ' || L_MsgCnt);
      if C_MsgCount != L_MsgCnt then
         dbms_output.put_line('  *****************************************');
         dbms_output.put_line('  TOO MANY ITEMS DEQUEUED?!?');
         dbms_output.put_line('  *****************************************');
         for i in 1 .. L_MsgCnt
         loop
            dbms_output.put_line('    #' || i || ' MsdId=' || L_MsgIdArr(i));
         end loop;
      end if;
      dbms_output.put_line('END DequeueMessages');
   end DequeueMessages;
   -- ================================================================================================

   /*
    * This is the testcase
   */
   procedure RunTestCase is
   begin
      CreateQueueIfMissing;

      for i in 1 .. C_TestRuns
      loop
         dbms_output.put_line(null);
         dbms_output.put_line('=========== START test run #' || i || '===========');
         CleanQueueTable;
         EnqueueMessages;
         DequeueMessages;
      end loop;
   end;
   -- ================================================================================================
begin
   RunTestCase;
end;

Example output:

START CreateQueueIfMissing
Skipping queue creation, already present.
END CreateQueueIfMissing

=========== START test run #1===========
START CleanQueueTable
  Messages in queue table BEFORE purge: 0
  Messages in queue table AFTER purge: 0
END CleanQueueTable
START EnqueueMessages
  Messages in queue table BEFORE enqueue: 0
    Construct #1
    Enqueue #1
  Messages in queue table AFTER enqueue: 1
END EnqueueMessages
START DequeueMessages
  Messages in queue table BEFORE dequeue: 1
  Messages in queue table AFTER dequeue: 0
  Expected: 1, Received: 1
END DequeueMessages

=========== START test run #2===========
START CleanQueueTable
  Messages in queue table BEFORE purge: 0
  Messages in queue table AFTER purge: 0
END CleanQueueTable
START EnqueueMessages
  Messages in queue table BEFORE enqueue: 0
    Construct #1
    Enqueue #1
  Messages in queue table AFTER enqueue: 1
END EnqueueMessages
START DequeueMessages
  Messages in queue table BEFORE dequeue: 1
  Messages in queue table AFTER dequeue: 0
  Expected: 1, Received: 2
  *****************************************
  TOO MANY ITEMS DEQUEUED?!?
  *****************************************
    #1 MsdId=2949A0FF2EE456A7E0540010E0467A30
    #2 MsdId=2949A0FF2EE456A7E0540010E0467A30
END DequeueMessages

Upvotes: 6

Views: 2901

Answers (2)

TenG
TenG

Reputation: 4004

Old post, but I was getting this on an old 11g database. After upgrade to 12c we still had the same issue. Even after upgrading to 19c we still had the same. According to the MOS bug, this should have been resolved in 12C, but it clear;y still persisted in 19C in our case.

I found no solution to this online, so am posting how we solved it here.

The issue being that DBMS_AQ.DEQUEUE_ARRAY randomly dequeued one or more records twice. Out of say 200,000 enqueued records, instead of getting 200,000 dequeued we'd get 200,001 to 200,005.

Bouncing the database solved the issue, but this would never be practical in production.

After much experimentation we found that by moving the COMMIT we had inside the loop to process the array outside the loop (i.e in the 200,000 example, issue one COMMIT at the end rather than after every array batch) the issue was fixed. However one of our processes enqueues over 5m rows, and when we used the same approach here performance was simply dreadful.

Again after much experimentation we found that by changing the DEQUEUE_OPTIONS.VISIBILITY to IMMEDIATE and DEQUEUE_OPTIONS.NAVIGATION to NEXT_RECORD solved the problem. We also added a 10 minutely COMMIT in the loop just to be sure.

Upvotes: 1

Alex Poole
Alex Poole

Reputation: 191520

This looks like bug 20659700. There's a bit more info in document 2002148.1.

You (or your DBA) should raise a service request to confirm that, and see if a patch is available for your platform.

Upvotes: 2

Related Questions