How to enqueue on Oracle AQ table on commit with Java and consume with a JMS client How to enqueue on Oracle AQ table on commit with Java and consume with a JMS client oracle oracle

How to enqueue on Oracle AQ table on commit with Java and consume with a JMS client


I was able to accomplish this - I had to guess around many parts of the Oracle API, and collecting hints from various blogs. For anyone interested here is way I got it working - 1. I created an Oracle Object on the Oracle Db2. With this Oracle Object, I created queue tables of the object type as the payload3. I am now able to enqueue AQMessage types with STRUCT payload, containing the object data4. And I am able to dequeue with a JMS consumer that understands the ADT payload type (Thanks to the article at http://blog.javaforge.net/post/30858904340/oracle-advanced-queuing-spring-custom-types)

Here are the steps with code - Create the Oracle object. The object can have any primary data type fields like VARCHAR, TIMESTAMP etc and also BLOB, CLOB etc. In this case I provided one of the columns as a blob to make things more complicated.

create or replace type aq_event_obj as object(  id       varchar2(100),  payload  BLOB);commit;

Now create the queue table. The payload type of the table is the oracle object.

private void setup(Connection conn) throws SQLException {    doUpdateDatabase(conn, "BEGIN " + "DBMS_AQADM.CREATE_QUEUE_TABLE( "            + "   QUEUE_TABLE        =>  'OBJ_SINGLE_QUEUE_TABLE',  " + "   QUEUE_PAYLOAD_TYPE =>  'AQ_EVENT_OBJ', "            + "   COMPATIBLE         =>  '10.0'); " + "END; ");    doUpdateDatabase(conn, "BEGIN " + "DBMS_AQADM.CREATE_QUEUE( " + "    QUEUE_NAME     =>   'OBJ_SINGLE_QUEUE', "            + "    QUEUE_TABLE    =>   'OBJ_SINGLE_QUEUE_TABLE'); " + "END;  ");    doUpdateDatabase(conn, "BEGIN " + "  DBMS_AQADM.START_QUEUE('OBJ_SINGLE_QUEUE'); " + "END; ");} 

You can now enqueue AQMessage types in Java with a struct instance of the object

public void enqueueMessage(OracleConnection conn, String correlationId, byte[] payloadData) throws Exception {    // First create the message properties:    AQMessageProperties aqMessageProperties = AQFactory.createAQMessageProperties();    aqMessageProperties.setCorrelation(correlationId);    aqMessageProperties.setExceptionQueue(EXCEPTION_QUEUE_NAME);    // Specify an agent as the sender:    AQAgent aqAgent = AQFactory.createAQAgent();    aqAgent.setName(SENDER_NAME);    aqAgent.setAddress(QUEUE_NAME);    aqMessageProperties.setSender(aqAgent);    // Create the payload    StructDescriptor structDescriptor = StructDescriptor.createDescriptor(EVENT_OBJECT, conn);    Map<String, Object> payloadMap = new HashMap<String, Object>();    payloadMap.put("ID", correlationId);    payloadMap.put("PAYLOAD", new OracleAQBLOBUtil().createBlob(conn, payloadData));    STRUCT struct = new STRUCT(structDescriptor, conn, payloadMap);    // Create the actual AQMessage instance:    AQMessage aqMessage = AQFactory.createAQMessage(aqMessageProperties);    aqMessage.setPayload(struct);    AQEnqueueOptions opt = new AQEnqueueOptions();    opt.setDeliveryMode(AQEnqueueOptions.DeliveryMode.PERSISTENT);    opt.setVisibility(AQEnqueueOptions.VisibilityOption.ON_COMMIT);    // execute the actual enqueue operation:    conn.enqueue(QUEUE_NAME, opt, aqMessage);}

The blob field needed special handling

public class OracleAQBLOBUtil {    public BLOB createBlob(OracleConnection conn, byte[] payload) throws Exception {        BLOB blob = BLOB.createTemporary(conn, false, BLOB.DURATION_SESSION);        OutputStream outputStream = blob.setBinaryStream(1L);        InputStream inputStream = new ByteArrayInputStream(payload);        try {            byte[] buffer = new byte[blob.getBufferSize()];            int bytesRead = 0;            while ((bytesRead = inputStream.read(buffer)) != -1) {                outputStream.write(buffer, 0, bytesRead);            }            return blob;        }        finally {            outputStream.close();            inputStream.close();        }    }    public byte[] saveOutputStream(BLOB blob) throws Exception {        InputStream inputStream = blob.getBinaryStream();        int counter;        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();        while ((counter = inputStream.read()) > -1) {            byteArrayOutputStream.write(counter);        }        byteArrayOutputStream.close();        return byteArrayOutputStream.toByteArray();    }}

For the consumer, you need to provide an instance of ORADataFactory that lets the consumer understand the payload type (your custom object).

AQjmsSession queueSession = (AQjmsSession) session;Queue queue = (Queue) ctx.lookup(queueName);MessageConsumer receiver = queueSession.createReceiver(queue, new OracleAQObjORADataFactory());

Where the code for OracleAQObjORADataFactory is

import java.io.ByteArrayOutputStream;import java.io.InputStream;import java.sql.Connection;import java.sql.SQLException;import oracle.jdbc.OracleTypes;import oracle.jpub.runtime.MutableStruct;import oracle.sql.BLOB;import oracle.sql.Datum;import oracle.sql.ORAData;import oracle.sql.ORADataFactory;import oracle.sql.STRUCT;public class OracleAQObjORADataFactory  implements ORAData, ORADataFactory {    public static final String EVENT_OBJECT = "SYSTEM.AQ_EVENT_OBJ";    public static final int _SQL_TYPECODE = OracleTypes.STRUCT;    protected MutableStruct _struct;    protected static int[] _sqlType = { java.sql.Types.VARCHAR, java.sql.Types.VARBINARY };    protected static ORADataFactory[] _factory = new ORADataFactory[2];    protected static final OracleAQObjORADataFactory  _AqEventObjFactory = new OracleAQObjORADataFactory ();    public static ORADataFactory getORADataFactory() {        return _AqEventObjFactory;    }    /* constructors */    protected void _init_struct(boolean init) {        if (init)            _struct = new MutableStruct(new Object[2], _sqlType, _factory);    }    public OracleAQObjORADataFactory () {        _init_struct(true);    }    public OracleAQObjORADataFactory (String id, byte[] payload) throws SQLException {        _init_struct(true);        setId(id);        setPayload(payload);    }    /* ORAData interface */    public Datum toDatum(Connection c) throws SQLException {        return _struct.toDatum(c, EVENT_OBJECT);    }    /* ORADataFactory interface */    public ORAData create(Datum d, int sqlType) throws SQLException {        return create(null, d, sqlType);    }    protected ORAData create(OracleAQObjORADataFactory  o, Datum d, int sqlType) throws SQLException {        if (d == null)            return null;        if (o == null)            o = new OracleAQObjORADataFactory ();        o._struct = new MutableStruct((STRUCT) d, _sqlType, _factory);        return o;    }    public String getId() throws SQLException {        return (String) _struct.getAttribute(0);    }    public void setId(String id) throws SQLException {        _struct.setAttribute(0, id);    }    public byte[] getPayload() throws SQLException {        BLOB blob = (BLOB) _struct.getAttribute(1);        InputStream inputStream = blob.getBinaryStream();        return getBytes(inputStream);    }    public byte[] getBytes(InputStream body) {        int c;        try {            ByteArrayOutputStream f = new ByteArrayOutputStream();            while ((c = body.read()) > -1) {                f.write(c);            }            f.close();            byte[] result = f.toByteArray();            return result;        }        catch (Exception e) {            System.err.println("Exception: " + e.getMessage());            e.printStackTrace();            return null;        }    }    public void setPayload(byte[] payload) throws SQLException {        _struct.setAttribute(1, payload);    }}

You're probably using Camel or Spring in your project, in which case - 1. If you're on Camel 2.10.2 or upwards, you can create a JMS consumer with a custom message lister container (CAMEL-5676)2. If you're on a previous version then you may not be able to use the endpoint way (i couldn't figure it out), but you can use a JMS request listener

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms"    xmlns:p="http://www.springframework.org/schema/p"    xsi:schemaLocation="http://www.springframework.org/schema/beans                        http://www.springframework.org/schema/beans/spring-beans-3.0.xsd                        http://www.springframework.org/schema/jms                        http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">    <!-- this is just an example, you can also use a datasource as the ctor arg -->    <bean id="connectionFactoryOracleAQQueue" class="oracle.jms.AQjmsFactory" factory-method="getQueueConnectionFactory">        <constructor-arg index="0">            <value>jdbc:oracle:thin:@blrub442:1522:UB23</value>        </constructor-arg>        <constructor-arg index="1" type="java.util.Properties">            <value></value>        </constructor-arg>    </bean>    <bean id="oracleQueueCredentials" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">        <property name="targetConnectionFactory">            <ref bean="connectionFactoryOracleAQQueue" />        </property>        <property name="username">            <value>system</value>        </property>        <property name="password">            <value>oracle</value>        </property>    </bean>    <!-- Definitions for JMS Listener classes that we have created -->    <bean id="aqMessageListener" class="com.misys.test.JmsRequestListener" />    <bean id="aqEventQueue" class="com.misys.test.OracleAqQueueFactoryBean">        <property name="connectionFactory" ref="oracleQueueCredentials" />        <property name="oracleQueueName" value="BOZ_SINGLE_QUEUE" />    </bean>    <!-- The Spring DefaultMessageListenerContainer configuration. This bean is automatically loaded when the JMS application context is started -->    <bean id="jmsContainer" class="com.misys.test.AQMessageListenerContainer" scope="singleton">        <property name="connectionFactory" ref="oracleQueueCredentials" />        <property name="destination" ref="aqEventQueue" />        <property name="messageListener" ref="aqMessageListener" />        <property name="sessionTransacted" value="false" />    </bean></beans>

The custom message listener container

public class AQMessageListenerContainer extends DefaultMessageListenerContainer {    @Override    protected MessageConsumer createConsumer(Session session, Destination destination) throws JMSException {        return ((AQjmsSession) session).createConsumer(destination, getMessageSelector(),                OracleAQObjORADataFactory.getORADataFactory(), null, isPubSubNoLocal());    }}

and the request listener onMessage method

public void onMessage(Message msg) {    try {        AQjmsAdtMessage aQjmsAdtMessage = (AQjmsAdtMessage) msg;        OracleAQObjORADataFactory obj = (OracleAQObjORADataFactory) aQjmsAdtMessage.getAdtPayload();        System.out.println("Datetime: " + obj.getId());        System.out.println("Payload: " + new String(obj.getPayload(), Charset.forName("UTF-8")));    }    catch (Exception jmsException) {        if (logger.isErrorEnabled()) {            logger.error(jmsException.getLocalizedMessage());        }    }}