lukisp
lukisp

Reputation: 1081

@Transactional rabbit listener cause infinite loop

Hi I have problem with my Rabbit listener which cause infinite loop on exception (requeue message). My configuration looks:

@Bean(name = "defContainer")
public RabbitListenerContainerFactory containerFactory(ConnectionFactory connectionFactory, PlatformTransactionManager transactionManager){
    SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
    containerFactory.setConnectionFactory(connectionFactory);
    containerFactory.setConcurrentConsumers(5);
    containerFactory.setAcknowledgeMode(AcknowledgeMode.AUTO);
    containerFactory.setTransactionManager(transactionManager);
    containerFactory.setMessageConverter(messageConverterAmqp());
    containerFactory.setDefaultRequeueRejected(false);

    return new TxRabbitListenerContainerFactory(containerFactory);
}

where transactionManager is JpaTransactionManager for transaction on postgre db. TxRabbitListenerContainerFactory is my factory which set setAlwaysRequeueWithTxManagerRollback to false:

public class TxRabbitListenerContainerFactory implements RabbitListenerContainerFactory {
private SimpleRabbitListenerContainerFactory factory;

public TxRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactory factory) {
    this.factory = factory;
}

@Override
public MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint) {
    SimpleMessageListenerContainer container = factory.createListenerContainer(endpoint);
    container.setAlwaysRequeueWithTxManagerRollback(false);
    return container;
}

}

Now I have listner like:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "topic.two", durable = "true"),
        exchange = @Exchange(value = "topic.def", type = "topic", durable = "true"),
        key = "letter.*"
), errorHandler = "rabErrorHandler", containerFactory = "defContainer")
@Transactional
public Motorcycle topicLetters(Motorcycle motorcycle) throws Exception{
    motorcycle.setId(UUID.randomUUID().toString());
    Testing testing = new Testing();
    testingRepository.save(testing);
    throwEx();
    return motorcycle;
}

where method throwEx(); throw unchecked exception. Data from DB are properly rollbacked (not commited), but message are constantly requeued, see it in listener:

@Bean
public RabbitListenerErrorHandler rabErrorHandler(){
    return new RabbitListenerErrorHandler() {
        @Override
        public Object handleError(Message message, org.springframework.messaging.Message<?> message1, ListenerExecutionFailedException e) throws Exception {
            System.out.println("FFFFFFFFFFF");
            return null;
        }
    };
}

How to prevvent infinite loope, and why is it happend ?

EDIT:

Logs: pasted logs

Upvotes: 2

Views: 2462

Answers (2)

lukisp
lukisp

Reputation: 1081

Found issue: Reason: caused by errorHandler handler which was specified on listener level. In some cases error handler return null - which was causing infinite loop (instead rethrowing exception and rollback transaction)

Upvotes: 0

Gary Russell
Gary Russell

Reputation: 174719

Set defaultRequeueRejected to false on the container factory.

To programmatically decide when to requeue or nor, leave that at true and throw an AmqpRejectAndDontRequeueException when you don't want it requeued.

EDIT

There's something not adding up...

protected void prepareHolderForRollback(RabbitResourceHolder resourceHolder, RuntimeException exception) {
    if (resourceHolder != null) {
        resourceHolder.setRequeueOnRollback(isAlwaysRequeueWithTxManagerRollback() ||
                RabbitUtils.shouldRequeue(isDefaultRequeueRejected(), exception, logger));
    }
}

If both booleans are false, we don't requeue.

Upvotes: 1

Related Questions