Seego
Seego

Reputation: 106

Apache Camel: how to consume messages from two or more JMS queues

From a programming point of view, I have a very simple business case. However, I can't figure out how to implement it using Apache Camel... Well, I have 2 JMS queues: one to receive commands, another - to store large number of message which should be delivered to external system in a batches of 1000 or less.

Here is the concept message exchange algorithm:

  1. upon receiving a command message in 1st JMS queue I prepare XML message
  2. Send the XML message to external SOAP Web Service to obtain a usertoken
  3. Using the usertoken, prepare another XML message and send it to a REST service to obtain jobToken
  4. loop:
    4.1. aggregate messages from 2nd JMS queue in batches of 1000, stop aggregation at timeout
    4.2. for every batch, convert it to CSV file
    4.3. send csv via HTTP Post to a REST service
    4.4. retain batchtoken assigned to each batch
  5. using the jobtoken prepare XML message and send to REST service to commit the batches
  6. using batchtoken check execution status of each batch via XML message to REST service

While looking at Camel I could create a sample project where I can model out the exchange 1-3, 5:

           from("file:src/data?noop=true")
                .setHeader("sfUsername", constant("[email protected]"))
                .setHeader("sfPwd", constant("12345"))
            .to("velocity:com/eip/vm/bulkPreLogin.vm?contentCache=false")
                .setHeader(Exchange.CONTENT_TYPE, constant("text/xml; charset=UTF-8"))
                .setHeader("SOAPAction", constant("login"))
                .setHeader("CamelHttpMethod", constant("POST"))
            .to("http4://bulklogin") // send login
            .to("xslt:com/eip/xslt/bulkLogin.xsl")  //xslt transformation to retrieve userToken
            .process(new Processor() {
                @Override
                public void process(Exchange exchange) throws Exception {
                    String body = (String) exchange.getIn().getBody();
                    String[] bodyParts = body.split(",");
                    exchange.getProperties().put("userToken", bodyParts[0]);
                    .....
                }
            })
            .to("velocity:com/eip/vm/jobInsertTeamOppStart.vm")
                .setHeader(Exchange.CONTENT_TYPE, constant("application/xml; charset=UTF-8"))
                .setHeader("X-Session", property("userToken"))
                .setHeader("CamelHttpMethod", constant("POST"))
            .to("http4://scheduleJob")       //schedule job
            .to("xslt:com//eip/xslt/jobInfoTransform.xsl")
            .process(new Processor() {
                @Override
                public void process(Exchange exchange) throws Exception {
                    String body = (String) exchange.getIn().getBody();
                    exchange.getProperties().put("jobToken",body.trim());
                }
            })
            //add batches in a loop ???
            .to("velocity:com/eip/vm/jobInsertTeamOppEnd.vm")
                .setHeader(Exchange.HTTP_URI, simple("https://na15.com/services/async/job/${property.jobToken}"))
                .setHeader(Exchange.CONTENT_TYPE, constant("application/xml; charset=UTF-8"))
                .setHeader("X-ID-Session", property("userToken"))
                .setHeader("CamelHttpMethod", constant("POST"))
            .to("http4://closeJob")       //schedule job
            //check batch?
            .bean(new SomeBean());

So, my question is: How can I read messages from my 2nd JMS queue?

Upvotes: 2

Views: 1920

Answers (1)

Geert Schuring
Geert Schuring

Reputation: 2163

This doesn't strike me as a very good use-case for a single camel route. I think you should implement the main functionality in a POJO and use Camels Bean Integration for consuming and producing messages. This will result in much more easy to maintain code, and also for easier Exception handling.

See https://camel.apache.org/pojo-consuming.html

Upvotes: 1

Related Questions