Reputation: 27988
I have flow where I read from a file, convert to CSV, and then convert each line to objects. So I end up with a List. Now I want to split this into smaller lists, and process each in parallel. I can use split() to get individual entries, but all my attempts at using aggregate has not resulted in a list, just single items.
from("file://")
.unmarshal(csvDataFormat)
.to("bean:personReader")
.split(body())
.aggregate( ??? )
.to("bean:send")
Upvotes: 3
Views: 2221
Reputation: 27988
I ended up using a custom aggregate strategy, seeing as the AbstractListAggregationStrategy didn't quite fit either:
class ListAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(final Exchange oldExchange, final Exchange newExchange) {
final Object value = newExchange.getIn().getBody();
if (oldExchange == null) {
newExchange.getIn().setBody(Lists.newArrayList(value));
return newExchange;
} else {
oldExchange.getIn().getBody(List.class).add(value);
return oldExchange;
}
}
}
Then: from("direct:source") .aggregate() .constant(true) .completionSize(1000) .aggregationStrategy(new ListAggregationStrategy()) .to("direct:target");
Upvotes: 0