Using advanced queueing v8.0.5.1

EDB Postgres Advanced Server advanced queueing provides message queueing and message processing for the EDB Postgres Advanced Server database. User-defined messages are stored in a queue. A collection of queues is stored in a queue table. Create a queue table before creating a queue that depends on it.

On the server side, procedures in the DBMS_AQADM package create and manage message queues and queue tables. Use the DBMS_AQ package to add messages to or remove messages from a queue or register or unregister a PL/SQL callback procedure. For more information about DBMS_AQ and DBMS_AQADM, see DBMS_AQ.

On the client side, the application uses the EDB.NET driver to enqueue and dequeue messages.

Enqueueing or dequeueing a message

For more information about using EDB Postgres Advanced Server's advanced queueing functionality, see Built-in packages.

Server-side setup

To use advanced queueing functionality on your .NET application, you must first create a user-defined type, queue table, and queue, and then start the queue on the database server. Invoke EDB-PSQL and connect to the EDB Postgres Advanced Server host database. Use the following SPL commands at the command line.

Creating a user-defined type

To specify a RAW data type, create a user-defined type. This example shows creating a user-defined type named myxml:

CREATE TYPE myxml AS (value XML);

Creating the queue table

A queue table can hold multiple queues with the same payload type. This example shows creating a table named MSG_QUEUE_TABLE:

EXEC DBMS_AQADM.CREATE_QUEUE_TABLE
      (queue_table => 'MSG_QUEUE_TABLE',
       queue_payload_type => 'myxml',
       comment => 'Message queue table');
END;

Creating the queue

This example shows creating a queue named MSG_QUEUE in the table MSG_QUEUE_TABLE:

BEGIN
DBMS_AQADM.CREATE_QUEUE ( queue_name => 'MSG_QUEUE', queue_table => 'MSG_QUEUE_TABLE', comment => 'This queue contains pending messages.');
END;

Starting the queue

Once the queue is created, invoke the following SPL code at the command line to start a queue in the EDB database:

BEGIN
DBMS_AQADM.START_QUEUE
(queue_name => 'MSG_QUEUE');
END;

Client-side example

Once you've created a user-defined type, the queue table, and the queue, start the queue. Then, you can enqueue or dequeue a message using EDB .Net drivers.

Enqueue a message

To enqueue a message on your .NET application, you must:

  1. Import the EnterpriseDB.EDBClient namespace.
  2. Pass the name of the queue and create the instance of the EDBAQQueue.
  3. Create the enqueue message and define a payload.
  4. Call the queue.Enqueue method.

The following code shows using the queue.Enqueue method.

Note

This code creates the message and serializes it. This is example code and doesn't compile if copied as it is. You must serialize the message as XML.

using EnterpriseDB.EDBClient;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace AQXml
{
    class MyXML
    {
        public string value { get; set; }
    }
    class Program
    {
        static void Main(string[] args)
        {
            int messagesToSend = 1;
            if (args.Length > 0 && !string.IsNullOrEmpty(args[0]))
            {
                messagesToSend = int.Parse(args[0]);
            }
            for (int i = 0; i < 5; i++)
            {
                EnqueMsg("test message: " + i);
            }
        }

      private static EDBConnection GetConnection()
        {
            string connectionString = "Server=127.0.0.1;Host=127.0.0.1;Port=5444;User Id=enterprisedb;Password=test;Database=edb;Timeout=999";
            EDBConnection connection = new EDBConnection(connectionString);
            connection.Open();
            return connection;
        }


        private static string ByteArrayToString(byte[] byteArray)
        {
            // Sanity check if it's null so we don't incur overhead of an exception
            if (byteArray == null)
            {
                return string.Empty;
            }
            try
            {
                StringBuilder hex = new StringBuilder(byteArray.Length * 2);
                foreach (byte b in byteArray)
                {
                    hex.AppendFormat("{0:x2}", b);
                }

                return hex.ToString().ToUpper();
            }
            catch
            {
                return string.Empty;
            }
        }

        private static bool EnqueMsg(string msg)
        {
            EDBConnection con = GetConnection();
            using (EDBAQQueue queue = new EDBAQQueue("MSG_QUEUE", con))
            {
                queue.MessageType = EDBAQMessageType.Xml;
                EDBTransaction txn = queue.Connection.BeginTransaction();
                QueuedEntities.Message queuedMessage = new QueuedEntities.Message() { MessageText = msg };

                try
                {
                    string rootElementName = queuedMessage.GetType().Name;
                    if (rootElementName.IndexOf(".") != -1)
                    {
                        rootElementName = rootElementName.Split('.').Last();
                    }

                    string xml = new Utils.XmlFragmentSerializer<QueuedEntities.Message>().Serialize(queuedMessage);
                    EDBAQMessage queMsg = new EDBAQMessage();
                    queMsg.Payload = new MyXML { value = xml };
                    queue.MessageType = EDBAQMessageType.Udt;
                    queue.UdtTypeName = "myxml";
                    EDBConnection.GlobalTypeMapper.MapComposite<MyXML>("myxml");
                    con.ReloadTypes();
                    queue.Enqueue(queMsg);
                    var messageId = ByteArrayToString((byte[])queMsg.MessageId);
                    Console.WriteLine("MessageID: " + messageId);
                    txn.Commit();
                    queMsg = null;
                    xml = null;
                    rootElementName = null;
                    return true;
                }
                catch (Exception ex)
                {
                    txn?.Rollback();
                    Console.WriteLine("Failed to enqueue message.");
                    Console.WriteLine(ex.ToString());
                    return false;
                }
                finally
                {
                    queue?.Connection?.Dispose();
                }
            }
        }

    }
}

Dequeueing a message

To dequeue a message on your .NET application, you must:

  1. Import the EnterpriseDB.EDBClient namespace.
  2. Pass the name of the queue and create the instance of the EDBAQQueue.
  3. Call the queue.Dequeue method.
Note

The following code creates the message and serializes it. This is example code and doesn't compile if copied as it is. You must serialize the message as XML.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using EnterpriseDB.EDBClient;

namespace DequeueXML
{
    class MyXML
    {
        public string value { get; set; }
    }
    class Program
    {
        static void Main(string[] args)
        {
            DequeMsg();
        }


        private static EDBConnection GetConnection()
        {
            string connectionString = "Server=127.0.0.1;Host=127.0.0.1;Port=5444;User Id=enterprisedb;Password=test;Database=edb;Timeout=999";
            EDBConnection connection = new EDBConnection(connectionString);
            connection.Open();
            return connection;
        }


        private static string ByteArrayToString(byte[] byteArray)
        {
            // Sanity check if it's null so we don't incur overhead of an exception
            if (byteArray == null)
            {
                return string.Empty;
            }
            try
            {
                StringBuilder hex = new StringBuilder(byteArray.Length * 2);
                foreach (byte b in byteArray)
                {
                    hex.AppendFormat("{0:x2}", b);
                }

                return hex.ToString().ToUpper();
            }
            catch
            {
                return string.Empty;
            }
        }
        public static void DequeMsg(int waitTime = 10)
        {
            EDBConnection con = GetConnection();
            using (EDBAQQueue queueListen = new EDBAQQueue("MSG_QUEUE", con))
            {
                queueListen.UdtTypeName = "myxml";
                queueListen.DequeueOptions.Navigation = EDBAQNavigationMode.FIRST_MESSAGE;
                queueListen.DequeueOptions.Visibility = EDBAQVisibility.ON_COMMIT;
                queueListen.DequeueOptions.Wait = 1;
                EDBTransaction txn = null;

                while (1 == 1)
                    {

                    if (queueListen.Connection.State == System.Data.ConnectionState.Closed)
                    {
                        queueListen.Connection.Open();
                    }

                    string messageId = "Unknown";
                    try
                    {
                        // the listen function is a blocking function. It will Wait the specified waitTime or until a
                        // message is received.
                        Console.WriteLine("Listening...");
                        string v = queueListen.Listen(null, waitTime);
                        // If we are waiting for a message and we specify a Wait time,
                        // then if there are no more messages, we want to just bounce out.
                        if (waitTime > -1 && v == null)
                        {
                            Console.WriteLine("No message received during Wait period.");
                            Console.WriteLine();
                            continue;
                        }

                        // once we're here that means a message has been detected in the queue. Let's deal with it.
                        txn = queueListen.Connection.BeginTransaction();

                        Console.WriteLine("Attempting to dequeue message...");
                        // dequeue the message
                        EDBAQMessage deqMsg;
                        try
                        {
                            deqMsg = queueListen.Dequeue();
                        }
                        catch (Exception ex)
                        {
                            if (ex.Message.Contains("ORA-25228"))
                            {
                                Console.WriteLine("Message was not there.  Another process must have picked it up.");
                                Console.WriteLine();
                                txn.Rollback();
                                continue;
                            }
                            else
                            {
                                throw;
                            }
                        }

                        messageId = ByteArrayToString((byte[])deqMsg.MessageId);
                        if (deqMsg != null)
                        {
                            Console.WriteLine("Processing received message...");
                            // process the message payload
                            MyXML obj = (MyXML) deqMsg.Payload;

                            QueuedEntities.Message msg = new Utils.XmlFragmentSerializer<QueuedEntities.Message>().Deserialize(obj.value);

                            Console.WriteLine("Received Message:");
                            Console.WriteLine("MessageID: " + messageId);
                            Console.WriteLine("Message: " + msg.MessageText);
                            Console.WriteLine("Enqueue Time" + queueListen.MessageProperties.EnqueueTime);

                            txn.Commit();

                            Console.WriteLine("Finished processing message");
                            Console.WriteLine();

                        }
                        else
                        {
                            Console.WriteLine("Message was not dequeued.");
                        }
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine("Failed To dequeue or process the dequeued message.");
                        Console.WriteLine(ex.ToString());
                        Console.WriteLine();
                        if (txn != null)
                        {
                            txn.Rollback();
                            if (txn != null)
                            {
                                txn.Dispose();
                            }
                        }
                    }
                }
            }

        }
    }
}

EDBAQ classes

The following EDBAQ classes are used in this application.

EDBAQDequeueMode

The EDBAQDequeueMode class lists all the dequeuer modes available.

ValueDescription
BrowseReads the message without locking.
LockedReads and gets a write lock on the message.
RemoveDeletes the message after reading. This is the default value.
Remove_NoDataConfirms receipt of the message.

EDBAQDequeueOptions

The EDBAQDequeueOptions class lists the options available when dequeuing a message.

PropertyDescription
ConsumerNameThe name of the consumer for which to dequeue the message.
DequeueModeSet from EDBAQDequeueMode. It represents the locking behavior linked with the dequeue option.
NavigationSet from EDBAQNavigationMode. It represents the position of the message to fetch.
VisibilitySet from EDBAQVisibility. It represents whether the new message is dequeued as part of the current transaction.
WaitThe wait time for a message as per the search criteria.
MsgidThe message identifier.
CorrelationThe correlation identifier.
DeqConditionThe dequeuer condition. It's a Boolean expression.
TransformationThe transformation to apply before dequeuing the message.
DeliveryModeThe delivery mode of the dequeued message.

EDBAQEnqueueOptions

The EDBAQEnqueueOptions class lists the options available when enqueuing a message.

PropertyDescription
VisibilitySet from EDBAQVisibility. It represents whether the new message is enqueued as part of the current transaction.
RelativeMsgidThe relative message identifier.
SequenceDeviationThe sequence when to dequeue the message.
TransformationThe transformation to apply before enqueuing the message.
DeliveryModeThe delivery mode of the enqueued message.

EDBAQMessage

The EDBAQMessage class lists a message to enqueue/dequeue.

PropertyDescription
PayloadThe actual message to queue.
MessageIdThe ID of the queued message.

EDBAQMessageProperties

The EDBAQMessageProperties lists the message properties available.

PropertyDescription
PriorityThe priority of the message.
DelayThe duration after which the message is available for dequeuing, in seconds.
ExpirationThe duration for which the message is available for dequeuing, in seconds.
CorrelationThe correlation identifier.
AttemptsThe number of attempts taken to dequeue the message.
RecipientListThe recipients list that overthrows the default queue subscribers.
ExceptionQueueThe name of the queue to move the unprocessed messages to.
EnqueueTimeThe time when the message was enqueued.
StateThe state of the message while dequeued.
OriginalMsgidThe message identifier in the last queue.
TransactionGroupThe transaction group for the dequeued messages.
DeliveryModeThe delivery mode of the dequeued message.

EDBAQMessageState

The EDBAQMessageState class represents the state of the message during dequeue.

ValueDescription
ExpiredThe message is moved to the exception queue.
ProcessedThe message is processed and kept.
ReadyThe message is ready to be processed.
WaitingThe message is in waiting state. The delay isn't reached.

EDBAQMessageType

The EDBAQMessageType class represents the types for payload.

ValueDescription
RawThe raw message type.

Note: Currently, this payload type isn't supported.
UDTThe user-defined type message.
XMLThe XML type message.

Note: Currently, this payload type isn't supported.

EDBAQNavigationMode

The EDBAQNavigationMode class represents the different types of navigation modes available.

ValueDescription
First_MessageReturns the first available message that matches the search terms.
Next_MessageReturns the next available message that matches the search items.
Next_TransactionReturns the first message of next transaction group.

EDBAQQueue

The EDBAQQueue class represents a SQL statement to execute DMBS_AQ functionality on a PostgreSQL database.

PropertyDescription
ConnectionThe connection to use.
NameThe name of the queue.
MessageTypeThe message type that's enqueued/dequeued from this queue, for example EDBAQMessageType.Udt.
UdtTypeNameThe user-defined type name of the message type.
EnqueueOptionsThe enqueue options to use.
DequeuOptionsThe dequeue options to use.
MessagePropertiesThe message properties to use.

EDBAQVisibility

The EDBAQVisibility class represents the visibility options available.

ValueDescription
ImmediateThe enqueue/dequeue isn't part of the ongoing transaction.
On_CommitThe enqueue/dequeue is part of the current transaction.
Note
  • To review the default options for these parameters, see DBMS_AQ.
  • EDB advanced queueing functionality uses user-defined types for calling enqueue/dequeue operations. Server Compatibility Mode=NoTypeLoading can't be used with advanced queueing because NoTypeLoading doesn't load any user-defined types.