Klaus
Klaus

Reputation: 2396

Camel Aggregator Misunderstanding

I wonder if somebody can shed some light on why the GroupedExchangeAggregationStrategy is not doing what I hoped it should do.

I have a list of messages that get slit properly and each passed to a my route process. From this route, the aggregator should collect all the responses and combine them in one nice final exchange and return it.

What I see however is that each of the splitted message gets executed properly. But the aggregator does not seem to put them together. Instead, the parent route ends with the original message. This is the behavior I would see with the default Camel DefaultAggregationCollection and UseLatestAggregationStrategy. But if I change the strategy to GroupedExchangeAggregationStrategy shouldn't I see something else?

It also seems that the route within the aggregator is not executed at all. I have a comment right after the from and and at the end of the route, but they are never printed.

Here is my split:

                <split>
                    <simple>${body}</simple>
                    <to uri="direct:splitprocess"/>
                </split> 
                <aggregate parallelProcessing="true" strategyRef="productAgrregator" >
                    <correlationExpression><simple>${headers.correlationId}</simple></correlationExpression>
                     <completionSize>
                        <simple>${headers.resultSize}</simple>
                     </completionSize>
                    <to uri="direct:aggregated"/>
                </aggregate>

And here is the route direct:aggregated:

        <route id="aggregated">
            <from uri="direct:aggregated"/>
            <log message="Starting aggregation with message: ${body}"/>
            <removeHeaders pattern="Exchange.CONTENT_ENCODING"/>
            <transform>
                <simple>${property.CamelGroupedExchange}</simple>
            </transform>
            <log message="Aggregated message: ${body}"/>
        </route>

And here is where I define the strategy:

<bean id="productAgrregator" class="org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy"/>

Another thing that puzzles me is that I specified parallelProcessing="true", but in the logs, I still see the following statement: Done sequential processing 3 exchanges. Shouldn't this be parallel processing?

Instead of using GroupedExchangeAggregationStrategy I also tried to simply specify groupExchanges="true". It hasn't help any bit.

As you can see, i am quite confused.

Below is a completly isolate example. One can call it with a POST request supplying JSON body such as: {"zero":"","one":""}. My expectation was to receive something like: {one=1A, zero=0A} or {one=0A, zero=1A}. Instead, I get {zero=,one=}.

        <route id="split">
            <from uri="restlet:/split?restletMethod=POST"/>
            <unmarshal ref="json"/>
            <setHeader headerName="bodySize"><simple>${body.size()}</simple></setHeader>
            <setHeader headerName="correlationId"><constant>'12345'</constant></setHeader>
            <split>
                <simple>${body}</simple>
                <to uri="direct:split-process"/>
                <aggregate parallelProcessing="true" groupExchanges="true">
                    <camel:correlationExpression><simple>${headers.correlationId}</simple></camel:correlationExpression>
                    <camel:completionSize><simple>${headers.bodySize}</simple></camel:completionSize>
                    <camel:to uri="direct:split-aggregate"/>
                </aggregate>
            </split>
        </route>


        <route id="split-process">
            <from uri="direct:split-process"/>
            <transform>
                <simple>${body} ${headers.CamelSplitIndex}A</simple>
            </transform>
        </route>            

        <route id="split-aggregate">
            <from uri="direct:split-aggregate"/>
            <to uri="mock:results"/>
        </route>            

Upvotes: 1

Views: 9909

Answers (2)

Klaus
Klaus

Reputation: 2396

Here is what I ended up with:

  1. No need to use an extra aggregator. Split has a built in aggregator as is stated in the documentation (http://camel.apache.org/splitter.html#Splitter-WhattheSplitterreturns). I cite: "This Splitter can be viewed as having a build in light weight Aggregator."

  2. Implemented a custom strategy to combine exchanges. GroupedExchangeAggregationStrategy seems to return only one message. I presume the last processed one. The custom strategy is verbatim the StringAggregationStrategy copied from: http://camel.apache.org/aggregator2.html

So, in the end my route looks as simple as this:

      <route id="split">
            <from uri="restlet:/split?restletMethod=POST"/>
            <split parallelProcessing="true" strategyRef="strategy">
                <tokenize token=","/>
                <transform>
                    <simple>${body} ${headers.CamelSplitIndex}CD</simple>
                </transform>
            </split>
        </route> 

Upvotes: 4

vikingsteve
vikingsteve

Reputation: 40428

I'll hazard an answer here.

Since your <aggregate...> falls outside your <split>, only 1 message is being sent into the aggregator, and so the completion limit is never being reached.

        <split>
            <simple>${body}</simple>
            <to uri="direct:splitprocess"/>
            <aggregate parallelProcessing="true" strategyRef="productAgrregator" >
                <correlationExpression><simple>${headers.correlationId}</simple></correlationExpression>
                 <completionSize>
                    <simple>${headers.resultSize}</simple>
                 </completionSize>
                <to uri="direct:aggregated"/>
            </aggregate>
        </split> 

Fingers crossed this will work... ;)

Upvotes: 1

Related Questions