Jakub
Jakub

Reputation: 111

Memory leak outgoing messages rabbitmq

I have a Spring Boot application that is using Spring Integration. And I have problem with increasing memory over time. When I run profiler I can see problem in outbound endpoint when it seems like confirmation is not received. When I try debug the handling of confirmation locally everything looks ok. It appears only in k8s environment where a lot of messages are being sent (Maybe that is why local env is without problem). The error seems it appears after upgrade of springboot from version 1.6 to 2.3 (and all dependent dependencies). So that why I think it some missconfiguration but I am not sure where or why.

Versions:

Configuration:

protected RabbitTemplate rabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate();
    rabbitTemplate.setConnectionFactory(connectionFactory);
    rabbitTemplate.setMessageConverter(new ContentTypeDelegatingMessageConverter());
    
    rabbitTemplate.setChannelTransacted(true);

    rabbitTemplate.setMandatory(amqpMandatoryFlag);
    return rabbitTemplate;
}

return IntegrationFlows.from(inputFlowChannel)
                .transform(messageRequestsTransformer())
                .transform(new ObjectToStringTransformer())
                .enrichHeaders(headers)
                .log(LoggingHandler.Level.INFO, "amqpOutboundConnectorLogging",
                        "headers.id + ': outboundAMQPPayload=' + payload")
                .handle(Amqp.outboundAdapter(rabbitTemplate())
                        .confirmCorrelationExpression("payload")
                        .returnChannel(returnChannel)
                        .exchangeName(amqpExchangeTarget)
                        .defaultDeliveryMode(messageDeliveryMode)
                        .headersMappedLast(true)
                );

public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setUri(this.amqpBrokerUri);
    connectionFactory.setChannelCacheSize(this.amqpChannelCacheSize);
    connectionFactory.setCacheMode(CacheMode.CHANNEL);
    connectionFactory.setRequestedHeartBeat(60);
    connectionFactory.setPublisherReturns(true);
    return connectionFactory;
}

Profiler screen:enter image description here

If some other configuration is need I will add it.

Thank you for any advice.

Upvotes: 0

Views: 1053

Answers (1)

Gary Russell
Gary Russell

Reputation: 174494

Thanks for the reproducer; there are several issues; the root cause is using transactions with returns.

You can't use publisher confirms with transactions and the confirms/returns are designed to work together; there is nothing to trigger the removal of the pending returns.

Scheduling a call to template.getUnconfirmed() would normally help with clearing the pending returns, but it doesn't work.

There are several contributing factors as to why the memory is leaked, not least of which is the channel doesn't increment the nextPublishSeqNo so the pendingConfirms indexing is broken; the pendingConfirms map is used to trigger the cleanup so we still leave pendingReturns around.

The workaround (for now) is to use publisher confirms instead of transactions (or disable returns), but I will see if I can come up with a workaround while using transactions.

connectionFactory.setPublisherConfirmType(ConfirmType.CORRELATED);

EDIT

I found the issue; the problem is you are telling the adapter to create correlation data for each request...

.confirmCorrelationExpression("payload")

...this is an instruction to the template to maintain the pending confirms/returns maps, which are never cleared out because confirms cannot be enabled with transactions.

Remove this and the adapter won't configure the template for confirms.

https://github.com/spring-projects/spring-amqp/issues/1439

Upvotes: 1

Related Questions