nick78
nick78

Reputation: 261

Camel AggregationStrategy keep excluded message for next aggregation iteration

Hi my input file looks like this

AAA,1
AAA,2
AAA,3
BBB,1
CCC,1
CCC,2
DDD,1
DDD,5
DDD,4
EEE,1

I want to split and aggregate this to multiple messages but keep associated records together like:

Message 1:

AAA,1
AAA,2
AAA,3
BBB,1
CCC,1
CCC,2

Message 2:

DDD,1
DDD,5
DDD,4
EEE,1

And prevent something like this:

Message 1:

AAA,1
AAA,2
AAA,3
BBB,1
CCC,1

Message 2:

CCC,2
DDD,1
DDD,5
DDD,4
EEE,1

CCC,2 should be written to Message 1 or CCC,1 should be written to Message 2.

The completionSize is not constant but should be something like a threshold. Regarding the example above working like "Aggregate 5 records in a message if there are further records beginning with CCC put them to the message too."

This is my route:

.split().tokenize("\n").streaming()
.aggregate().constant(true)
.aggregationStrategy(new MyAggregationStrategy())
.completionSize(5)
.completionTimeout(5000)

After reaching the completionSize threshold of 5, MyAggregationStrategy has to check the next message (newExchange) and decide if it will be aggregated to the oldExchange or not even if the size will be bigger than 5. If the message will not be aggregated to the oldExchange, the aggregation is complete and a new aggregation begins. How do I ensure that this message that was rejected by the last aggregation will be the first message regarded in the new aggregation?

Since the input file may be very large I will use streaming and not read in the whole file first and then cut it to single messages by a custom bean.

Upvotes: 1

Views: 1639

Answers (2)

Miloš Milivojević
Miloš Milivojević

Reputation: 5369

Why not group only the related elements together? You are currently aggregating with .constant(true) meaning that there is only a single correlation group to aggregate. Instead, you can do something like:

.split().tokenize("\n").streaming()
.process(e -> ...)  //extract the type (AAA, BBB, etc.) into a header called type
.aggregate(header("type"), new MyAggregationStrtegy())
.completionTimeout(5000)

This way, only related messages would be a part of your aggregated batch.

Upvotes: 1

Claus Ibsen
Claus Ibsen

Reputation: 55750

You can turn on eagerCheckCompletion to true which will check for completion before aggregation. See more details in the docs: http://camel.apache.org/aggregator2

Upvotes: 1

Related Questions