Reputation:
I effectively want a flush, or a completionSize but for all the aggregations in the aggregator. Like a global completionSize.
Basically I want to make sure that every message that comes in a batch is aggregated and then have all the aggregations in that aggregator complete at once when the last one has been read.
e.g. 1000 messages arrive (the length is not known beforehand)
aggregate on correlation id into bins
A 300
B 400
C 300 (size of the bins is not known before hand)
I want the aggregator not to complete until the 1000th exchange is aggregated
thereupon I want all of the aggregations in the aggregator to complete at once
The CompleteSize applies to each aggregation, and not the aggregator as a whole unfortunately. So if I set CompleteSize( 1000 ) it will just never finish, since each aggregation has to exceed 1000 before it is 'complete'
I could get around it by building up a single Map object, but this is kind of sidestepping the correlation in aggregator2, that I would prefer to use ideally
so yeah, either global-complete-size or flushing, is there a way to do this intelligently?
Upvotes: 2
Views: 2821
Reputation: 1
Using Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE worked for me:
from(endpoint)
.unmarshal(csvFormat)
.split(body())
.bean(CsvProcessor())
.choice()
// If all messages are processed,
// flush the aggregation
.`when`(simple("\${property.CamelSplitComplete}"))
.setHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, constant(true))
.end()
.aggregate(simple("\${body.trackingKey}"),
AggregationStrategies.bean(OrderAggregationStrategy()))
.completionTimeout(10000)
Upvotes: 0
Reputation: 55750
I suggest to take a look at the Camel aggregator eip doc http://camel.apache.org/aggregator2, and read about the different completion conditions. And as well that special message Ben refers to you can send to signal to complete all in-flight aggregates.
If you consume from a batch consumer http://camel.apache.org/batch-consumer.html then you can use a special completion that complets when the batch is done. For example if you pickup files or rows from a JPA database table etc. Then when all messages from the batch consumer has been processed then the aggregator can signal completion for all these aggregated messages, using the completionFromBatchConsumer option.
Also if you have a copy of the Camel in Action book, then read chapter 8, section 8.2, as its all about the aggregate EIP covered in much more details.
Upvotes: 0
Reputation: 21005
one option is to simply add some logic to keep a global counter and set the Exchange.AGGREGATION_COMPLETE_ALL_GROUPS
header once its reached...
Available as of Camel 2.9...You can manually complete all current aggregated exchanges by sending in a message containing the header Exchange.AGGREGATION_COMPLETE_ALL_GROUPS set to true. The message is considered a signal message only, the message headers/contents will not be processed otherwise.
Upvotes: 3