Reputation: 329
I have noticed (code decompile) that when a timeout is set on a aggreator the whole message group is being stored in a future (in memory) and not in the storage. This causes at times "Out Of Memory" exceptions when a high throughput happens.
Is there a better way of handling this?
<aggregator input-channel="orderNotificationLoadBalancedExecutorChannelLATAM" output-channel="orderNotificationConverterChannelLATAM"
message-store="orderNotificationGroupStoreLATAM"
send-partial-result-on-expiry="true"
ref="firstOnlyPrimaryKeyMessageAggregator"
method="aggregate"
correlation-strategy-expression="headers['erpKeyMap']['erpKey']"
release-strategy-expression="#this[0].headers['tableName'].topLevel and #this[0].headers['operationType'].operationTypeDelete"
expire-groups-upon-completion="true"
expire-groups-upon-timeout="true"
group-timeout="5000">
</aggregator>
Upvotes: 2
Views: 419
Reputation: 121560
Oops!
Looks like a bug. I've just raised a JIRA on the matter.
The guilty code looks like:
private void scheduleGroupToForceComplete(final MessageGroup messageGroup) {
...
ScheduledFuture<?> scheduledFuture = this.getTaskScheduler()
.schedule(new Runnable() {
@Override
public void run() {
try {
forceReleaseProcessor.processMessageGroup(messageGroup);
}
catch (MessageDeliveryException e) {
if (logger.isDebugEnabled()) {
logger.debug("The MessageGroup [ " + messageGroup +
"] is rescheduled by the reason: " + e.getMessage());
}
scheduleGroupToForceComplete(messageGroup);
}
}
}, new Date(System.currentTimeMillis() + groupTimeout));
So, the ScheduledFuture
holds the reference to the final MessageGroup
via that inline Runnable
callback.
I think we will fix it using only the groupId
.
Sorry, there is no any workarounds...
Upvotes: 2
Reputation: 22516
You can set a message store. See the here.
- A reference to a MessageGroupStore used to store groups of messages under their correlation key until they are complete. Optional, by default a volatile in-memory store.
Upvotes: 1