How to enqueue on Oracle AQ table on commit with Java and consume with a JMS client
I was able to accomplish this - I had to guess around many parts of the Oracle API, and collecting hints from various blogs. For anyone interested here is way I got it working - 1. I created an Oracle Object on the Oracle Db2. With this Oracle Object, I created queue tables of the object type as the payload3. I am now able to enqueue AQMessage types with STRUCT payload, containing the object data4. And I am able to dequeue with a JMS consumer that understands the ADT payload type (Thanks to the article at http://blog.javaforge.net/post/30858904340/oracle-advanced-queuing-spring-custom-types)
Here are the steps with code - Create the Oracle object. The object can have any primary data type fields like VARCHAR, TIMESTAMP etc and also BLOB, CLOB etc. In this case I provided one of the columns as a blob to make things more complicated.
create or replace type aq_event_obj as object( id varchar2(100), payload BLOB);commit;
Now create the queue table. The payload type of the table is the oracle object.
private void setup(Connection conn) throws SQLException { doUpdateDatabase(conn, "BEGIN " + "DBMS_AQADM.CREATE_QUEUE_TABLE( " + " QUEUE_TABLE => 'OBJ_SINGLE_QUEUE_TABLE', " + " QUEUE_PAYLOAD_TYPE => 'AQ_EVENT_OBJ', " + " COMPATIBLE => '10.0'); " + "END; "); doUpdateDatabase(conn, "BEGIN " + "DBMS_AQADM.CREATE_QUEUE( " + " QUEUE_NAME => 'OBJ_SINGLE_QUEUE', " + " QUEUE_TABLE => 'OBJ_SINGLE_QUEUE_TABLE'); " + "END; "); doUpdateDatabase(conn, "BEGIN " + " DBMS_AQADM.START_QUEUE('OBJ_SINGLE_QUEUE'); " + "END; ");}
You can now enqueue AQMessage types in Java with a struct instance of the object
public void enqueueMessage(OracleConnection conn, String correlationId, byte[] payloadData) throws Exception { // First create the message properties: AQMessageProperties aqMessageProperties = AQFactory.createAQMessageProperties(); aqMessageProperties.setCorrelation(correlationId); aqMessageProperties.setExceptionQueue(EXCEPTION_QUEUE_NAME); // Specify an agent as the sender: AQAgent aqAgent = AQFactory.createAQAgent(); aqAgent.setName(SENDER_NAME); aqAgent.setAddress(QUEUE_NAME); aqMessageProperties.setSender(aqAgent); // Create the payload StructDescriptor structDescriptor = StructDescriptor.createDescriptor(EVENT_OBJECT, conn); Map<String, Object> payloadMap = new HashMap<String, Object>(); payloadMap.put("ID", correlationId); payloadMap.put("PAYLOAD", new OracleAQBLOBUtil().createBlob(conn, payloadData)); STRUCT struct = new STRUCT(structDescriptor, conn, payloadMap); // Create the actual AQMessage instance: AQMessage aqMessage = AQFactory.createAQMessage(aqMessageProperties); aqMessage.setPayload(struct); AQEnqueueOptions opt = new AQEnqueueOptions(); opt.setDeliveryMode(AQEnqueueOptions.DeliveryMode.PERSISTENT); opt.setVisibility(AQEnqueueOptions.VisibilityOption.ON_COMMIT); // execute the actual enqueue operation: conn.enqueue(QUEUE_NAME, opt, aqMessage);}
The blob field needed special handling
public class OracleAQBLOBUtil { public BLOB createBlob(OracleConnection conn, byte[] payload) throws Exception { BLOB blob = BLOB.createTemporary(conn, false, BLOB.DURATION_SESSION); OutputStream outputStream = blob.setBinaryStream(1L); InputStream inputStream = new ByteArrayInputStream(payload); try { byte[] buffer = new byte[blob.getBufferSize()]; int bytesRead = 0; while ((bytesRead = inputStream.read(buffer)) != -1) { outputStream.write(buffer, 0, bytesRead); } return blob; } finally { outputStream.close(); inputStream.close(); } } public byte[] saveOutputStream(BLOB blob) throws Exception { InputStream inputStream = blob.getBinaryStream(); int counter; ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); while ((counter = inputStream.read()) > -1) { byteArrayOutputStream.write(counter); } byteArrayOutputStream.close(); return byteArrayOutputStream.toByteArray(); }}
For the consumer, you need to provide an instance of ORADataFactory that lets the consumer understand the payload type (your custom object).
AQjmsSession queueSession = (AQjmsSession) session;Queue queue = (Queue) ctx.lookup(queueName);MessageConsumer receiver = queueSession.createReceiver(queue, new OracleAQObjORADataFactory());
Where the code for OracleAQObjORADataFactory is
import java.io.ByteArrayOutputStream;import java.io.InputStream;import java.sql.Connection;import java.sql.SQLException;import oracle.jdbc.OracleTypes;import oracle.jpub.runtime.MutableStruct;import oracle.sql.BLOB;import oracle.sql.Datum;import oracle.sql.ORAData;import oracle.sql.ORADataFactory;import oracle.sql.STRUCT;public class OracleAQObjORADataFactory implements ORAData, ORADataFactory { public static final String EVENT_OBJECT = "SYSTEM.AQ_EVENT_OBJ"; public static final int _SQL_TYPECODE = OracleTypes.STRUCT; protected MutableStruct _struct; protected static int[] _sqlType = { java.sql.Types.VARCHAR, java.sql.Types.VARBINARY }; protected static ORADataFactory[] _factory = new ORADataFactory[2]; protected static final OracleAQObjORADataFactory _AqEventObjFactory = new OracleAQObjORADataFactory (); public static ORADataFactory getORADataFactory() { return _AqEventObjFactory; } /* constructors */ protected void _init_struct(boolean init) { if (init) _struct = new MutableStruct(new Object[2], _sqlType, _factory); } public OracleAQObjORADataFactory () { _init_struct(true); } public OracleAQObjORADataFactory (String id, byte[] payload) throws SQLException { _init_struct(true); setId(id); setPayload(payload); } /* ORAData interface */ public Datum toDatum(Connection c) throws SQLException { return _struct.toDatum(c, EVENT_OBJECT); } /* ORADataFactory interface */ public ORAData create(Datum d, int sqlType) throws SQLException { return create(null, d, sqlType); } protected ORAData create(OracleAQObjORADataFactory o, Datum d, int sqlType) throws SQLException { if (d == null) return null; if (o == null) o = new OracleAQObjORADataFactory (); o._struct = new MutableStruct((STRUCT) d, _sqlType, _factory); return o; } public String getId() throws SQLException { return (String) _struct.getAttribute(0); } public void setId(String id) throws SQLException { _struct.setAttribute(0, id); } public byte[] getPayload() throws SQLException { BLOB blob = (BLOB) _struct.getAttribute(1); InputStream inputStream = blob.getBinaryStream(); return getBytes(inputStream); } public byte[] getBytes(InputStream body) { int c; try { ByteArrayOutputStream f = new ByteArrayOutputStream(); while ((c = body.read()) > -1) { f.write(c); } f.close(); byte[] result = f.toByteArray(); return result; } catch (Exception e) { System.err.println("Exception: " + e.getMessage()); e.printStackTrace(); return null; } } public void setPayload(byte[] payload) throws SQLException { _struct.setAttribute(1, payload); }}
You're probably using Camel or Spring in your project, in which case - 1. If you're on Camel 2.10.2 or upwards, you can create a JMS consumer with a custom message lister container (CAMEL-5676)2. If you're on a previous version then you may not be able to use the endpoint way (i couldn't figure it out), but you can use a JMS request listener
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd"> <!-- this is just an example, you can also use a datasource as the ctor arg --> <bean id="connectionFactoryOracleAQQueue" class="oracle.jms.AQjmsFactory" factory-method="getQueueConnectionFactory"> <constructor-arg index="0"> <value>jdbc:oracle:thin:@blrub442:1522:UB23</value> </constructor-arg> <constructor-arg index="1" type="java.util.Properties"> <value></value> </constructor-arg> </bean> <bean id="oracleQueueCredentials" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter"> <property name="targetConnectionFactory"> <ref bean="connectionFactoryOracleAQQueue" /> </property> <property name="username"> <value>system</value> </property> <property name="password"> <value>oracle</value> </property> </bean> <!-- Definitions for JMS Listener classes that we have created --> <bean id="aqMessageListener" class="com.misys.test.JmsRequestListener" /> <bean id="aqEventQueue" class="com.misys.test.OracleAqQueueFactoryBean"> <property name="connectionFactory" ref="oracleQueueCredentials" /> <property name="oracleQueueName" value="BOZ_SINGLE_QUEUE" /> </bean> <!-- The Spring DefaultMessageListenerContainer configuration. This bean is automatically loaded when the JMS application context is started --> <bean id="jmsContainer" class="com.misys.test.AQMessageListenerContainer" scope="singleton"> <property name="connectionFactory" ref="oracleQueueCredentials" /> <property name="destination" ref="aqEventQueue" /> <property name="messageListener" ref="aqMessageListener" /> <property name="sessionTransacted" value="false" /> </bean></beans>
The custom message listener container
public class AQMessageListenerContainer extends DefaultMessageListenerContainer { @Override protected MessageConsumer createConsumer(Session session, Destination destination) throws JMSException { return ((AQjmsSession) session).createConsumer(destination, getMessageSelector(), OracleAQObjORADataFactory.getORADataFactory(), null, isPubSubNoLocal()); }}
and the request listener onMessage method
public void onMessage(Message msg) { try { AQjmsAdtMessage aQjmsAdtMessage = (AQjmsAdtMessage) msg; OracleAQObjORADataFactory obj = (OracleAQObjORADataFactory) aQjmsAdtMessage.getAdtPayload(); System.out.println("Datetime: " + obj.getId()); System.out.println("Payload: " + new String(obj.getPayload(), Charset.forName("UTF-8"))); } catch (Exception jmsException) { if (logger.isErrorEnabled()) { logger.error(jmsException.getLocalizedMessage()); } }}