Reputation: 121
Camel 3.21 & String Boot 2.7.15
public void configure() throws Exception {
AggregationStrategy arrayListStrategy = AggregationStrategies.flexible(Document.class)
.accumulateInCollection(ArrayList.class)
.pick(body());
BiFunction<String, Integer, List<Document>> generate = (type, size) ->
IntStream.range(1, size +1)
.mapToObj(i -> new Document("type", type).append("id", "id" + i))
.toList();
from("timer://timer?repeatCount=1")
.to("direct:route-1");
from("direct:route-1")
.process(exchange -> {
var documents = generate.apply("type-1", 300);
exchange.getMessage().setBody(documents);
})
.split(body()).streaming().aggregate(constant(true), arrayListStrategy).completionTimeout(3000)
.enrich("direct:route-2", new ConcatAllAggregationStrategy())
.log(LoggingLevel.DEBUG, LOG_NAME,"route-1 : ${body.size()}");
from("direct:route-2")
.process(exchange -> {
var documents = generate.apply("type-2", 500);
exchange.getMessage().setBody(documents);
})
.split(body()).streaming().aggregate(constant(true), arrayListStrategy).completionTimeout(3000)
.process(exchange -> {
var body = (List<Document>)exchange.getIn().getBody();
LOGGER.debug("route-2.processor : {}", body.size());
})
.log(LoggingLevel.DEBUG, LOG_NAME,"route-2 : ${body.size()}");
}
private class ConcatAllAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (newExchange == null) {
return oldExchange;
}
var oldBody = (List<Document>)oldExchange.getIn().getBody();
var newBody = (List<Document>)newExchange.getIn().getBody();
var all = Stream.concat(oldBody.stream(), newBody.stream()).toList();
oldExchange.getIn().setBody(all);
return oldExchange;
}
}
The program output is as follows:
DEBUG [][Camel (camel-1) thread #4 - Aggregator] o.a.c.s.CamelLogger: route-1 : 800
DEBUG [][Camel (camel-1) thread #5 - Aggregator] o.e.c.AnotherBuilder: route-2.processor : 500
DEBUG [][Camel (camel-1) thread #5 - Aggregator] o.a.c.s.CamelLogger: route-2 : 500
Why does the log for route-2 appear after the log for route-1? I thought route-2 should execute completely before the enrich() aggregation strategy is called.
If I remove the line .split(body()).streaming().aggregate(constant(true), arrayListStrategy).completionTimeout(3000)
, it behaves as expected, and I get:
DEBUG [][Camel (camel-1) thread #1 - timer://timer] o.e.c.AnotherBuilder: route-2.processor : 500
DEBUG [][Camel (camel-1) thread #1 - timer://timer] o.a.c.s.CamelLogger: route-2 : 500
DEBUG [][Camel (camel-1) thread #1 - timer://timer] o.a.c.s.CamelLogger: route-1 : 800
How can I achieve this result while using split(body()).streaming()
followed by .aggregate()
?
Upvotes: 0
Views: 188
Reputation: 121
For those interested, the following code works as expected. I moved away from using enrich and instead implemented a new AggregationStrategy along with a recipientList.
from("timer://timer?repeatCount=1")
.recipientList(simple("seda:route-1,seda:route-2"))
.end();
from("seda:route-1")
.process(exchange -> {
var documents = generate.apply("type-1", 300);
exchange.getMessage().setBody(documents);
})
.split(body()).streaming().aggregate(constant(true), arrayListStrategy).completionTimeout(3000)
.to("seda:aggregate");
from("seda:route-2")
.process(exchange -> {
var documents = generate.apply("type-2", 500);
exchange.getMessage().setBody(documents);
})
.split(body()).streaming().aggregate(constant(true), arrayListStrategy).completionTimeout(3000)
.process(exchange -> {
var body = (List<Document>)exchange.getIn().getBody();
LOGGER.debug("route-2.processor : {}", body.size());
})
.log(LoggingLevel.DEBUG, LOG_NAME,"route-2 : ${body.size()}")
.to("seda:aggregate");
from("seda:aggregate")
.aggregate(constant(true), new ConcatAllAggregationStrategy()).completionSize(2)
.log(LoggingLevel.DEBUG, LOG_NAME," => 3 - body.size : ${body.size}");
private class ConcatAllAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
var newBody = (List<Document>)newExchange.getIn().getBody();
List<Document> oldBody = null;
if (oldExchange == null) {
oldBody = newBody;
newExchange.getIn().setBody(oldBody);
return newExchange;
} else {
oldBody = (List<Document>)oldExchange.getIn().getBody();
}
var all = Stream.concat(oldBody.stream(), newBody.stream()).toList();
oldExchange.getIn().setBody(all);
return oldExchange;
}
}
The program output is as follows:
DEBUG [][Camel (camel-1) thread #10 - Aggregator] o.e.c.AnotherBuilder: route-2.processor : 500
DEBUG [][Camel (camel-1) thread #10 - Aggregator] o.a.c.s.CamelLogger: route-2 : 500
DEBUG [][Camel (camel-1) thread #11 - Aggregator] o.a.c.s.CamelLogger: => 3 - body.size : 800
Upvotes: 0