Mathias Dpunkt
Mathias Dpunkt

Reputation: 12184

spring-amqp transaction semantics

I am currently testing a rather simple example concerning messaging transactions in connection with database transactions with spring amqp.

The use case is as follows:

The expected behavior in case of a failure during the database operation is that the received message is rolled back to the bus (DefaultRequeueRejected = false) and goes into a dead letter queue. Also the message sent should be rolled back.

I can achieve this with the following configuration:

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(messageConverter);
    rabbitTemplate.setChannelTransacted(true);
    return rabbitTemplate;
}

@Bean
    SimpleMessageListenerContainer subscriberListenerContainer(ConnectionFactory connectionFactory,
                                                              MessageListenerAdapter listenerAdapter,
                                                              PlatformTransactionManager transactionManager) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(SUBSCRIBER_QUEUE_NAME);
        container.setMessageListener(listenerAdapter);
        container.setChannelTransacted(true);
        container.setTransactionManager(transactionManager);
        container.setDefaultRequeueRejected(false);
        return container;
    }

So this works fine - what I do not understand is that the observed behavior is exactly the same if I do not set the transaction manager on the SimpleMessageListenerContainer. So if I configure the following the bebavior does not change:

@Bean
        SimpleMessageListenerContainer subscriberListenerContainer(ConnectionFactory connectionFactory,
                                                                  MessageListenerAdapter listenerAdapter) {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setQueueNames(SUBSCRIBER_QUEUE_NAME);
            container.setMessageListener(listenerAdapter);
            container.setDefaultRequeueRejected(false);
            return container;
        }

Can someone explain what is happening there? Why is the second case also working? What is different internally if the PlatformTransactionManager is registered on the SimpleMessageListenerContainer.

Upvotes: 5

Views: 4297

Answers (1)

Gary Russell
Gary Russell

Reputation: 174739

Assuming the transactionManager is your database tm, since your listener is @Transactional, there's not a lot of difference for these scenarios.

In the first case, the transaction is started by the container before the listener is invoked (actually before the message is retrieved from an internal queue so a transaction will start even if there's no message).

In the second case, the transaction is started by the transaction interceptor when we invoke the listener.

Consider the case where the listener is not transactional, but some downstream component is; let's say the listener invokes that component successfully, then does some more work before throwing an exception. In that case, the DB commit would be successful and the message rejected. This might not be the desired behavior, especially if messages are requeued. In cases like this, it's generally better to synchronize the rabbit transaction with the database transaction by injecting the database tm.

In your case, there is little chance of a failure between the db commit and the rabbit ack so this really doesn't apply in your case and you don't need a tm in the container.

Upvotes: 5

Related Questions