Benno
Benno

Reputation: 329

Spring Integration timeout with Aggregator in memory

I have noticed (code decompile) that when a timeout is set on a aggreator the whole message group is being stored in a future (in memory) and not in the storage. This causes at times "Out Of Memory" exceptions when a high throughput happens.

Is there a better way of handling this?

<aggregator input-channel="orderNotificationLoadBalancedExecutorChannelLATAM" output-channel="orderNotificationConverterChannelLATAM"
            message-store="orderNotificationGroupStoreLATAM"
            send-partial-result-on-expiry="true"
            ref="firstOnlyPrimaryKeyMessageAggregator"
            method="aggregate"
            correlation-strategy-expression="headers['erpKeyMap']['erpKey']"
            release-strategy-expression="#this[0].headers['tableName'].topLevel and #this[0].headers['operationType'].operationTypeDelete"
            expire-groups-upon-completion="true"
            expire-groups-upon-timeout="true"
            group-timeout="5000">
</aggregator>

Upvotes: 2

Views: 419

Answers (2)

Artem Bilan
Artem Bilan

Reputation: 121560

Oops!

Looks like a bug. I've just raised a JIRA on the matter.

The guilty code looks like:

private void scheduleGroupToForceComplete(final MessageGroup messageGroup) {
...
    ScheduledFuture<?> scheduledFuture = this.getTaskScheduler()
        .schedule(new Runnable() {

            @Override
            public void run() {
                try {
                    forceReleaseProcessor.processMessageGroup(messageGroup);
                    }
                    catch (MessageDeliveryException e) {
                        if (logger.isDebugEnabled()) {
                            logger.debug("The MessageGroup [ " + messageGroup +
                                "] is rescheduled by the reason: " + e.getMessage());
                        }
                        scheduleGroupToForceComplete(messageGroup);
                    }
                }
            }, new Date(System.currentTimeMillis() + groupTimeout));

So, the ScheduledFuture holds the reference to the final MessageGroup via that inline Runnable callback.

I think we will fix it using only the groupId.

Sorry, there is no any workarounds...

Upvotes: 2

Evgeni Dimitrov
Evgeni Dimitrov

Reputation: 22516

You can set a message store. See the here.

  1. A reference to a MessageGroupStore used to store groups of messages under their correlation key until they are complete. Optional, by default a volatile in-memory store.

Upvotes: 1

Related Questions