T.Rex
T.Rex

Reputation: 121

Apache Camel - How to use enrich() after split(body()).streaming().aggregate?

Camel 3.21 & String Boot 2.7.15

public void configure() throws Exception {

    AggregationStrategy arrayListStrategy = AggregationStrategies.flexible(Document.class)
            .accumulateInCollection(ArrayList.class)
            .pick(body());

    BiFunction<String, Integer, List<Document>> generate = (type, size) ->
            IntStream.range(1, size +1)
            .mapToObj(i -> new Document("type", type).append("id", "id" + i))
            .toList();

    from("timer://timer?repeatCount=1")
            .to("direct:route-1");

    from("direct:route-1")
            .process(exchange -> {
                var documents = generate.apply("type-1", 300);
                exchange.getMessage().setBody(documents);
            })
            .split(body()).streaming().aggregate(constant(true), arrayListStrategy).completionTimeout(3000)
            .enrich("direct:route-2", new ConcatAllAggregationStrategy())
            .log(LoggingLevel.DEBUG, LOG_NAME,"route-1 : ${body.size()}");

    from("direct:route-2")
            .process(exchange -> {
                var documents = generate.apply("type-2", 500);
                exchange.getMessage().setBody(documents);
            })
            .split(body()).streaming().aggregate(constant(true), arrayListStrategy).completionTimeout(3000)
            .process(exchange -> {
                var body = (List<Document>)exchange.getIn().getBody();
                LOGGER.debug("route-2.processor : {}", body.size());
            })
            .log(LoggingLevel.DEBUG, LOG_NAME,"route-2 : ${body.size()}");
}


private class ConcatAllAggregationStrategy implements AggregationStrategy {

    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (newExchange == null) {
            return oldExchange;
        }
        var oldBody = (List<Document>)oldExchange.getIn().getBody();
        var newBody = (List<Document>)newExchange.getIn().getBody();
        var all = Stream.concat(oldBody.stream(), newBody.stream()).toList();
        oldExchange.getIn().setBody(all);
        return oldExchange;
    }
}

The program output is as follows:

DEBUG [][Camel (camel-1) thread #4 - Aggregator] o.a.c.s.CamelLogger: route-1 : 800
DEBUG [][Camel (camel-1) thread #5 - Aggregator] o.e.c.AnotherBuilder: route-2.processor : 500
DEBUG [][Camel (camel-1) thread #5 - Aggregator] o.a.c.s.CamelLogger: route-2 : 500

Why does the log for route-2 appear after the log for route-1? I thought route-2 should execute completely before the enrich() aggregation strategy is called.

If I remove the line .split(body()).streaming().aggregate(constant(true), arrayListStrategy).completionTimeout(3000), it behaves as expected, and I get:

DEBUG [][Camel (camel-1) thread #1 - timer://timer] o.e.c.AnotherBuilder: route-2.processor : 500
DEBUG [][Camel (camel-1) thread #1 - timer://timer] o.a.c.s.CamelLogger: route-2 : 500
DEBUG [][Camel (camel-1) thread #1 - timer://timer] o.a.c.s.CamelLogger: route-1 : 800

How can I achieve this result while using split(body()).streaming() followed by .aggregate() ?

Upvotes: 0

Views: 188

Answers (1)

T.Rex
T.Rex

Reputation: 121

For those interested, the following code works as expected. I moved away from using enrich and instead implemented a new AggregationStrategy along with a recipientList.

from("timer://timer?repeatCount=1")
    .recipientList(simple("seda:route-1,seda:route-2"))
    .end();

from("seda:route-1")
        .process(exchange -> {
            var documents = generate.apply("type-1", 300);
            exchange.getMessage().setBody(documents);
        })
        .split(body()).streaming().aggregate(constant(true), arrayListStrategy).completionTimeout(3000)
        .to("seda:aggregate");

from("seda:route-2")
        .process(exchange -> {
            var documents = generate.apply("type-2", 500);
            exchange.getMessage().setBody(documents);
        })
        .split(body()).streaming().aggregate(constant(true), arrayListStrategy).completionTimeout(3000)
        .process(exchange -> {
            var body = (List<Document>)exchange.getIn().getBody();
            LOGGER.debug("route-2.processor : {}", body.size());
        })
        .log(LoggingLevel.DEBUG, LOG_NAME,"route-2 : ${body.size()}")
        .to("seda:aggregate");

from("seda:aggregate")
        .aggregate(constant(true), new ConcatAllAggregationStrategy()).completionSize(2)
        .log(LoggingLevel.DEBUG, LOG_NAME," => 3 - body.size : ${body.size}");
        
private class ConcatAllAggregationStrategy implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        var newBody = (List<Document>)newExchange.getIn().getBody();
        List<Document> oldBody = null;
        if (oldExchange == null) {
            oldBody = newBody;
            newExchange.getIn().setBody(oldBody);
            return newExchange;
        } else {
            oldBody = (List<Document>)oldExchange.getIn().getBody();
        }
        var all = Stream.concat(oldBody.stream(), newBody.stream()).toList();
        oldExchange.getIn().setBody(all);
        return oldExchange;
    }
}
        

The program output is as follows:

DEBUG [][Camel (camel-1) thread #10 - Aggregator] o.e.c.AnotherBuilder: route-2.processor : 500
DEBUG [][Camel (camel-1) thread #10 - Aggregator] o.a.c.s.CamelLogger: route-2 : 500
DEBUG [][Camel (camel-1) thread #11 - Aggregator] o.a.c.s.CamelLogger:  => 3 - body.size : 800

Upvotes: 0

Related Questions