Reputation: 261
Hi my input file looks like this
AAA,1
AAA,2
AAA,3
BBB,1
CCC,1
CCC,2
DDD,1
DDD,5
DDD,4
EEE,1
I want to split and aggregate this to multiple messages but keep associated records together like:
Message 1:
AAA,1
AAA,2
AAA,3
BBB,1
CCC,1
CCC,2
Message 2:
DDD,1
DDD,5
DDD,4
EEE,1
And prevent something like this:
Message 1:
AAA,1
AAA,2
AAA,3
BBB,1
CCC,1
Message 2:
CCC,2
DDD,1
DDD,5
DDD,4
EEE,1
CCC,2 should be written to Message 1 or CCC,1 should be written to Message 2.
The completionSize is not constant but should be something like a threshold. Regarding the example above working like "Aggregate 5 records in a message if there are further records beginning with CCC put them to the message too."
This is my route:
.split().tokenize("\n").streaming()
.aggregate().constant(true)
.aggregationStrategy(new MyAggregationStrategy())
.completionSize(5)
.completionTimeout(5000)
After reaching the completionSize threshold of 5, MyAggregationStrategy has to check the next message (newExchange) and decide if it will be aggregated to the oldExchange or not even if the size will be bigger than 5. If the message will not be aggregated to the oldExchange, the aggregation is complete and a new aggregation begins. How do I ensure that this message that was rejected by the last aggregation will be the first message regarded in the new aggregation?
Since the input file may be very large I will use streaming and not read in the whole file first and then cut it to single messages by a custom bean.
Upvotes: 1
Views: 1639
Reputation: 5369
Why not group only the related elements together? You are currently aggregating with .constant(true)
meaning that there is only a single correlation group to aggregate. Instead, you can do something like:
.split().tokenize("\n").streaming()
.process(e -> ...) //extract the type (AAA, BBB, etc.) into a header called type
.aggregate(header("type"), new MyAggregationStrtegy())
.completionTimeout(5000)
This way, only related messages would be a part of your aggregated batch.
Upvotes: 1
Reputation: 55750
You can turn on eagerCheckCompletion
to true
which will check for completion before aggregation. See more details in the docs: http://camel.apache.org/aggregator2
Upvotes: 1