Satish Kumar
Satish Kumar

Reputation: 51

Issue with Spring Integration Aggregator group-timeout value

We are using Spring Integration 4.2.3 Aggregator component and defined group-timeout and expecting the group to be timed out within the given timeout value while adding messages to the group & release size criteria is not met.

But we are seeing different results, when we input heavy load to the service the aggregator is waiting on all messages to be added to the group rather than expiring the group when the timeout reached.

Is there any way to override the aggregator functionality to look at the first message rather than last message when timing out group.

Upvotes: 3

Views: 2151

Answers (3)

Satish Kumar
Satish Kumar

Reputation: 51

we debugged the issue with your group-timeout-expression(timestamp + 20000 - T(System).currentTimeMillis()) and found out that the expression is evaluating to a negative value after messaged keep flowing in thus causing the group never getting released.

The code block where the issue is in AbstractCorrelatingMessageHandler.java

enter image description here

Once we removed the condition of "groupTimeout >= 0", now the group is getting expired because of the else block. The code is now behaving like how we expected.

Could you let me know why you are not forcing the group to be timedoout when it reaches negative value?

Upvotes: 0

Artem Bilan
Artem Bilan

Reputation: 121552

Well, actually you can do what you need even now. Using the same group-timeout-expression. But you have to consult the #root object of the evaluation context which is exactly what you need - MessageGroup. With that you can call one of for your purpose:

/**
 * @return the timestamp (milliseconds since epoch) associated with the creation of this group
 */
long getTimestamp();

/**
 * @return the timestamp (milliseconds since epoch) associated with the time this group was last updated
 */
long getLastModified();

Therefore an expression for your original request might be like:

group-timeout-expression="timestamp + 10000 - T(System).currentTimeMillis()"

And we get that adjusted timeout which will be applied to scheduled task with the value like: new Date(System.currentTimeMillis() + groupTimeout));.

Upvotes: 3

Gary Russell
Gary Russell

Reputation: 174759

No; the timeout is currently based on the arrival of the last message only.

If you use a MessageGroupStoreReaper instead, the time is based on the group creation by default, but that can be changed by setting the group store's timeoutOnIdle to true.

If your group is not timing out at all, perhaps the thread pool in the default taskScheduler is exhausted - it only has 10 threads by default.

You can increase the pool size or inject a dedicated scheduler into the aggregator.

Upvotes: 0

Related Questions