Reputation: 1611
I currently have the following Camel route:
<camelContext id="my-camel-context" xmlns="http://camel.apache.org/schema/spring">
<propertyPlaceholder id="envProps" location="classpath:myapp.properties" />
<route id="my-camel-route">
<from uri="{{start.uri}}"/>
<setHeader headerName="id">
<constant>1</constant>
</setHeader>
<to uri="bean:preProcessor?method=process" />
<aggregate strategyRef="myAggregationStrategy" completionSize="1">
<correlationExpression>
<simple>${header.id} == 1</simple>
</correlationExpression>
<to uri="bean:postProcessor?method=process" />
</aggregate>
<to uri="bean:mailer?method=process" />
</route>
</camelContext>
<bean id="myAggregationStrategy" class="com.me.myapp.MyAggregationStrategy" />
<bean id="postProcessor" class="com.me.myapp.PostProcessor" />
<bean id="mailer" class="com.me.myapp.Mailer" />
For now, I'm not really aggregating anything meaningful (completionSize=1
), I'm really just testing AggregationStrategy
out. Here's my strategy:
public class MyAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange aggregatingExchange, Exchange incomingExchange) {
AppPayload payload = null;
if(aggregatingExchange == null)
payload = new AppPayload(); // This should prevent it from being NULL below in PostProcessor...
else
payload = (AppPayload)incomingExchange.getIn().getBody();
payload.setCargo((Order)incomingExchange.getIn().getBody());
if(aggregatingExchange == null) {
incomingExchange.getIn().setBody(payload);
return incomingExchange;
}
else
return aggregatingExchange;
}
}
And also my postProcessor
bean:
public class PostProcessor implement Processor {
@Override
public void process(Exchange exchange) {
try {
System.out.println("In PostProcessor...");
AppPayload payload = (AppPayload)exchange.getIn().getBody();
System.out.println("\t...payload acquired...");
if(payload == null)
System.out.println("Payload is NULL.");
} catch(Throwable throwable) {
System.out.println(ExceptionUtils.getFullStackTrace(throwable));
}
}
}
When I run this code, I see log messages from my preProcessor
bean that inidcate it is executing correctly. And I also see that MyAggregationStrategy
is correctly "aggregating" the message and then letting it pass on to postProcessor
after the 1st message arrives (again, because completionSize=1
). However, I'm getting the following output in postProcessor
:
In PostProcessor...
...payload acquired...
Payload is NULL.
Can anyone see why payload
would be NULL? Shouldn't it have been initialized up inside MyAggregationStrategy
?!? I'm happy to post more code, but I believe it stems from me using the AggregationStrategy
API incorrectly.
Upvotes: 0
Views: 1950
Reputation: 2255
Adding to what @hveiga already mentioned. I had a similar issue which I resolved by adding header to my messages. However in your case I see that you are not using splitter and that you already have a header defined. So a piece of information that got from Clauss Ibssen was that the firs time exchange would be empty and we need to check for the null object.
See this for more explanation - Apache Camel - Split and aggregate - Old Exchange is always null
Track the complete explanation here - http://camel.465427.n5.nabble.com/Split-and-Aggregate-Old-Exchange-is-null-everytime-in-AggregationStrategy-td5746365.html
Upvotes: 0
Reputation: 6915
I believe you are getting confused with aggregatingExchange
and incomingExchange
. Can you try this:
public class MyAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange aggregatingExchange, Exchange incomingExchange) {
AppPayload payload = null;
if(aggregatingExchange == null) {
payload = new AppPayload(); // This should prevent it from being NULL below in PostProcessor...
} else {
payload = (AppPayload)aggregatingExchange.getIn().getBody();
}
payload.setCargo((Order)incomingExchange.getIn().getBody());
if(aggregatingExchange == null) {
incomingExchange.getIn().setBody(payload);
return incomingExchange;
} else {
return aggregatingExchange;
}
}
}
Upvotes: 1