How to read Oracle AQ using Camel Route and Save it to a Data Source ?

Oracle AQ (Advanced Queuing) is a feature of the Oracle Database that provides a messaging infrastructure for building distributed applications. It allows applications to communicate and exchange messages asynchronously in a reliable and transactional manner.

Oracle AQ is based on the message queuing paradigm, where messages are sent from a sender application to a queue and then retrieved and processed by a receiver application. The sender and receiver applications can be running on the same or different systems, allowing for distributed communication.

Key features of Oracle AQ include:

  1. Asynchronous Messaging: Oracle AQ enables asynchronous communication between applications, where the sender and receiver do not need to be active at the same time. This decoupling of sender and receiver allows for greater flexibility and scalability in application design.
  2. Reliable Message Delivery: Oracle AQ ensures reliable delivery of messages by providing built-in persistence, transactional support, and message buffering. It maintains a copy of each message until it is successfully consumed by the receiver, ensuring message integrity and fault tolerance.
  3. Message Priority: Messages in Oracle AQ can be assigned different priorities, allowing critical messages to be processed before less important ones. This feature enables the application to handle high-priority messages first, ensuring timely processing of important information.
  4. Message Filtering: Oracle AQ supports message filtering based on user-defined criteria. This allows receivers to selectively consume only messages that meet specific conditions, improving the efficiency of message processing.
  5. Publish-Subscribe Model: Oracle AQ also supports a publish-subscribe messaging model, where publishers can broadcast messages to multiple subscribers. This enables broadcasting information to multiple recipients efficiently.

Oracle AQ is typically used in enterprise applications that require reliable and scalable messaging, such as order processing systems, financial applications, and distributed data integration scenarios. It provides a robust messaging infrastructure that helps build decoupled, scalable, and fault-tolerant systems.

Camel Routes refer to the routing configurations in Apache Camel, an open-source integration framework that facilitates the integration of various systems, applications, and protocols. A Camel Route defines the path that a message takes from a source (input) to a destination (output) through a series of processing steps.

In Camel, a Route is created by combining a set of message-processing components and defining the flow of the message through these components. The route configuration specifies how the message is received, transformed, filtered, and delivered to the desired endpoint.

Here are the key components and concepts associated with Camel Routes:

  1. Route Builder: A Route Builder is a class or configuration file where Camel routes are defined. It provides a domain-specific language (DSL) to define the route configurations and their processing steps.
  2. Message Sources: A route typically starts with a message source, also known as an input or consumer. It could be a file system directory, a JMS queue, an HTTP endpoint, a timer, or any other component that can receive messages.
  3. Message Processors: Message processors are components that perform various operations on the message during its journey through the route. They can transform the message, filter it based on conditions, aggregate multiple messages, invoke external services, or apply any custom logic.
  4. Routing Rules: Routing rules define how the message is directed from one component to another based on certain conditions or criteria. These rules determine the flow of the message within the route and are often based on content-based routing, header values, or dynamic decisions.
  5. Message Endpoints: A message endpoint represents the destination or output of a route. It could be a file system directory, a database, an email server, an external service, or any other component that can receive or process the message.
  6. Error Handling: Camel provides robust error handling capabilities within routes. It allows defining error handlers to catch and handle exceptions, define retry mechanisms, and control the flow in case of failures or exceptions during message processing.

Camel Routes can be defined using various formats, including Java DSL, XML configuration, or Spring DSL, depending on the preference and requirements of the project.

Overall, Camel Routes provide a flexible and powerful way to define and orchestrate message-based integration flows, enabling seamless communication and interaction between different systems and components.

How do we start when there is a need to handle a legacy system with a newer technology ?
This high level code example includes how to connect to Oracle Advanced Queue from a target Database, read the message from the AQ and then using camel routes perform the business logic on method level.
Note : Make sure to add correct class and dependency via mvn repository or your defined repo. Some example of imports below:

import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.component.jms.JmsConfiguration;
import oracle.jms.AQjmsFactory;
import oracle.jdbc.pool.OracleDataSource;
import org.apache.camel.LoggingLevel;
import org.apache.camel.TypeConversionException;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.bean.validator.BeanValidationException;
First step where we will setup the connection factory and JmsConfiguration:
@Configuration
@EnableJms
@EnableConfigurationProperties(AqOracleDataSourceProperties.class)
@Profile("dev")
public class OracleAdvancedQueueConfig {
    private static final String JMS_CLIENT_ACKNOWLEDGEMENT_MODE = "CLIENT_ACKNOWLEDGE";
	//This should be the connection properties of the database that you are 
          reading from. 
    @Autowired
    private AqOracleDataSourceProperties aqOracleDataSourceProperties;
	//Using AQjmsFactory for Oracle Advanced Query aka AQ to create a 
          connection factory
    @Bean
    public QueueConnectionFactory connectionFactoryOracleAQQueue() throws JMSException, SQLException {
        return AQjmsFactory.getQueueConnectionFactory(oracleAqDataSource());
    }
	//Now creating Jms Configuration
    @Bean
    public JmsConfiguration oracleAqJmsConfiguration() throws Exception {
        JmsConfiguration jmsConfiguration = new JmsConfiguration();
        jmsConfiguration.setConnectionFactory(connectionFactoryOracleAQQueue());
        jmsConfiguration.setAcceptMessagesWhileStopping(true);
        jmsConfiguration.setFormatDateHeadersToIso8601(true);
        return jmsConfiguration;
    }
	//Now creating the Jms Component specific to camel Routes. Make sure the 
         method name matches with     the one you have in the properties,yaml etc.
    @Bean
    public JmsComponent oracleJmsAq() throws Exception {
        JmsComponent jmsComponent = new JmsComponent();
        jmsComponent.setConfiguration(oracleAqJmsConfiguration());
        return jmsComponent;
    }
    
    private DataSource oracleAqDataSource() throws SQLException {
        OracleDataSource oracleDataSource = new OracleDataSource();
        oracleDataSource.setUser(aqOracleDataSourceProperties.getUsername());
        oracleDataSource.setPassword(aqOracleDataSourceProperties.getPassword());
        oracleDataSource.setURL(aqOracleDataSourceProperties.getUrl());
        oracleDataSource.setImplicitCachingEnabled(true);
        oracleDataSource.setFastConnectionFailoverEnabled(true);
        return oracleDataSource;
    }
// Second step should be Camel Routes setup
**Make sure to get the dependency first**
**Make sure the method name at JmsComponent matches with the route jms component**
@Component
@Profile("dev")
public class YourRoute extends RouteBuilder {
    @SuppressWarnings("unchecked")
    @Override
    public void configure() throws Exception {
	//Use your logic here to catch any exception
        onException(UncategorizedSQLException.class)
                .maximumRedeliveries(0)
                .handled(false)
                .useOriginalMessage()
                .log(LoggingLevel.WARN, logger, "Unable to save message: " + 
                BODY_AND_STACKTRACE);
        onException(IOException.class, Exception.class)
                .handled(false)
                .useOriginalMessage()
                .logHandled(true)
                .getRedeliveryPolicy()
                .logRetryStackTrace(true)
                .retryAttemptedLogLevel(LoggingLevel.WARN) // generate email
                .retriesExhaustedLogLevel(LoggingLevel.WARN) // generate email
                .allowRedeliveryWhileStopping(false)
                .maximumRedeliveries(180)
                .redeliveryDelay(100)
                .maximumRedeliveryDelay(10000)
                .useExponentialBackOff();
	//Make sure to have these properties in yaml or some other class based on your needs.
        from("{{app.some-camel-component}}{{app.oracle-aq-endpoint.name}}")
                .routeId(ROUTE_ID + "Dev")
                .to(METHOD_WHERE_WE_WILL_SAVE_THE_MESSAGE);
        
        from(METHOD_WHERE_WE_WILL_SAVE_THE_MESSAGE)
                .routeId(ROUTE_ID)
                .bean(someServiceClassNameWhereYourMethodLives, "saveToDatabaseMethod(${body}, ${header.JMSTimestamp})");
    }
}

Posted

in

by

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *