Reputation: 56914
I have the following route:
<route id="my-aggregator">
<from uri="direct:aggregator" />
<aggregate strategyRef="myAggregationStrategy" completionSize="3">
<correlationExpression>
<simple>${header.id}</simple>
</correlationExpression>
<to uri="bean:payloadProcessor?method=process" />
</aggregate>
</route>
As you can see, it waits until it has received 3 exchanges that all have a matching header.id
value. Once it receives 3 such messages, the following code executes:
public class MyAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange e1, Exchange e2) {
// The first (of 3) message will have a `List<Widget>` on its body. Extract it here.
List<Widget> redWidgets = ??? // e1.getIn().getBody()? e2.getIn().getBody()? something else?
// The second (of 3) message will have a `List<Widget>` on its body. Extract it here.
List<Widget> blueWidgets = ??? // e1.getIn().getBody()? e2.getIn().getBody()? something else?
// The third (of 3) message will have a `List<Widget>` on its body. Extract it here.
List<Widget> greenWidgets = ??? // e1.getIn().getBody()? e2.getIn().getBody()? something else?
WidgetPayload payload = new WidgetPayload(redWidgets, blueWidgets, greenWidgets);
// Which "output" exchange do I set payload to, so that it gets routed on to the
// "payloadProcessor" bean?
e1.getOut().setBody(payload); /* or */ e2.getOut().setBody(payload);
// What do we even return?
return e1; /* or return e2; */
}
}
So in the aggregator, I'm trying to get access to each List<Widget>
that was sent to the aggregator by various routes, combine them into a WidgetPayload
instance, and then set the WidgetPayload
as the outbound/return value of the outbound exchange. But I can't figure out the correct way to do all this. Specifically:
List<Widget>
from e1
and e2
?e1
and e2
even represent? In-Out? New/Old? Something else?e1
/e2
do I make so that payload
is forwarded on to the payloadProcessor
bean?Upvotes: 0
Views: 1103
Reputation: 6853
Look at the code example the page I mentioned; the first argument is the aggregated exchange. It is null on the first invocation of an aggregation. You have to create your WidgetPayload on the first invocation and then keep adding widgets to it with every sub-sequent invocation. Then, when you get to the route step after the aggregator, the body of the exchange is your aggregated WidgetPayload.
Upvotes: 2