thaweatherman
thaweatherman

Reputation: 1497

Using camel to aggregate messages of same header

I have multiple clients that send files to a server. For one set of data there are two files that contain information about that data, each with the same name. When a file is received, the server sends a message out to my queue containing the file path, file name, ID of the client, and the "type" of file it is (all have same file extension but there are two "types," call them A and B).

The two files for one set of data have the same file name. As soon as the server has received both of the files I need to start a program that combines the two. Currently I have something that looks like this:

from("jms:queue.name").aggregate(header("CamelFileName")).completionSize(2).to("exec://FILEPATH?args=");

Where I am stuck is the header("CamelFileName"), and more specifically how the aggregator works.

With the completionSize set to 2 does it just suck up all the messages and store them in some data structure until a second message that matches the first comes through? Also, does the header() expect a specific value? I have multiple clients so I was thinking of having the client ID and the file name in the header, but then again I don't know if I have to give a specific value. I also don't know if I can use a regex or not.

Any ideas or tips would be super helpful. Thanks

EDIT: Here is some code I have now. Based on my description of the problem here and in comments on selected answer does it seem accurate (besides close brackets that I didn't copy over)?

public static void main(String args[]) throws Exception{
        CamelContext c = new DefaultCamelContext();
        c.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));
        //ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
        //c.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
        c.addRoutes(new RouteBuilder() {
            public void configure() {
                from("activemq:queue:analytics.camelqueue").aggregate(new MyAggregationStrategy()).header("subject").completionSize(2).to("activemq:queue:analytics.success");
            }
        });
        c.start();
        while (true) {
            System.out.println("Waiting on messages to come through for camel");
            Thread.sleep(2 * 1000);
        }
        //c.stop();
    }

    private static class MyAggregationStrategy implements AggregationStrategy {

        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            if (oldExchange == null)
                return newExchange;
            // and here is where combo stuff goes
            String oldBody = oldExchange.getIn().getBody(String.class);
            String newBody = newExchange.getIn().getBody(String.class);
            boolean oldSet = oldBody.contains("set");
            boolean newSet = newBody.contains("set");
            boolean oldFlow = oldBody.contains("flow");
            boolean newFlow = newBody.contains("flow");
            if ( (oldSet && newFlow) || (oldFlow && newSet) ) {
                //they match so return new exchange with info so extractor can be started with exec
                String combined = oldBody + "\n" + newBody + "\n";
                newExchange.getIn().setBody(combined);
                return newExchange;
            }
            else {
                // no match so do something....
                return null;
            }
        }
    }

Upvotes: 1

Views: 5462

Answers (1)

Ben ODay
Ben ODay

Reputation: 21015

you must supply an AggregationStrategy to define how you want to combine Exchanges...

if you are only interested in the fileName and receiving exactly 2 Exchanges, then you can just use the UseLatestAggregationStrategy to just pass the newest Exchange through once 2 have been 'aggregated'...

that said, it sounds like you need to retain both Exchanges (one for each clientId) so you can pass that info on to the 'exec' step...if so, you can just combine the Exchanges into a GroupedExchange holder using the built-in aggregation strategy enabled via the groupExchanges option...or specificy a custom AggregationStrategy to combine them however you'd like. just need to keep in mind that your 'exec' step needs to handle whatever aggregated structure you decide to use...

see these unit tests for examples:

https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorTest.java

https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java

Upvotes: 3

Related Questions