Mashrur
Mashrur

Reputation: 535

Camel aggregator does not aggregate all

I am splitting some java objects and then aggregating. I am kind of confused how this completion strategy works with Camel (2.15.2). I am using completion size and completion timeout. If I understand correctly, completion timeout does not have much effect in it. Because, there is not much waiting going on here.

Altogether, I have 3000+ objects. But, it seems only a part of it is aggregated. But, if I vary the completion size value, situation changes. If the size is 100, it aggregates around 800, and if it is 200, it aggregates up to around 1600. But, I don't know the size of objects in advance, and so, cannot rely on a assumed number.

Can anyone please explain to me what I am doing wrong here? If I use eagerCheckCompletion, it aggregates the whole thing in a one go, which I don't want. Below is my route:

   from("direct:specializeddatavalidator")
        .to("bean:headerFooterValidator").split(body())
        .process(rFSStatusUpdater)
        .process(dataValidator).choice()
        .when(header("discrepencyList").isNotNull()).to("seda:errorlogger")
        .otherwise().to("seda:liveupdater").end();


    from("seda:liveupdater?concurrentConsumers=4&timeout=5000")
        .aggregate(simple("${in.header.contentType}"),
                batchAggregationStrategy())
        .completionSize(MAX_RECORDS)
        .completionTimeout(BATCH_TIME_OUT).to("bean:liveDataUpdater");

    from("seda:errorlogger?concurrentConsumers=4")
        .aggregate(simple("${in.header.contentType}"),
                batchAggregationStrategy("discrepencyList"))
        .completionSize(MAX_RECORDS_FOR_ERRORS)
        .completionTimeout(BATCH_TIME_OUT)
        .process(errorProcessor).to("bean:liveDataUpdater");

Upvotes: 1

Views: 2128

Answers (1)

J2B
J2B

Reputation: 1753

Weird, but if you want to aggregate all the splitted messages you can simply use

.split(body(), batchAggregationStrategy())

And depending on how you want it to work you can use

.shareUnitOfWork().stopOnException()

See http://camel.apache.org/splitter.html for more info

Upvotes: 2

Related Questions