Reputation: 535
I am splitting some java objects and then aggregating. I am kind of confused how this completion strategy works with Camel (2.15.2). I am using completion size and completion timeout. If I understand correctly, completion timeout does not have much effect in it. Because, there is not much waiting going on here.
Altogether, I have 3000+ objects. But, it seems only a part of it is aggregated. But, if I vary the completion size value, situation changes. If the size is 100, it aggregates around 800, and if it is 200, it aggregates up to around 1600. But, I don't know the size of objects in advance, and so, cannot rely on a assumed number.
Can anyone please explain to me what I am doing wrong here? If I use eagerCheckCompletion, it aggregates the whole thing in a one go, which I don't want. Below is my route:
from("direct:specializeddatavalidator")
.to("bean:headerFooterValidator").split(body())
.process(rFSStatusUpdater)
.process(dataValidator).choice()
.when(header("discrepencyList").isNotNull()).to("seda:errorlogger")
.otherwise().to("seda:liveupdater").end();
from("seda:liveupdater?concurrentConsumers=4&timeout=5000")
.aggregate(simple("${in.header.contentType}"),
batchAggregationStrategy())
.completionSize(MAX_RECORDS)
.completionTimeout(BATCH_TIME_OUT).to("bean:liveDataUpdater");
from("seda:errorlogger?concurrentConsumers=4")
.aggregate(simple("${in.header.contentType}"),
batchAggregationStrategy("discrepencyList"))
.completionSize(MAX_RECORDS_FOR_ERRORS)
.completionTimeout(BATCH_TIME_OUT)
.process(errorProcessor).to("bean:liveDataUpdater");
Upvotes: 1
Views: 2128
Reputation: 1753
Weird, but if you want to aggregate all the splitted messages you can simply use
.split(body(), batchAggregationStrategy())
And depending on how you want it to work you can use
.shareUnitOfWork().stopOnException()
See http://camel.apache.org/splitter.html for more info
Upvotes: 2