Reputation: 1081
Hi I have problem with my Rabbit listener which cause infinite loop on exception (requeue message). My configuration looks:
@Bean(name = "defContainer")
public RabbitListenerContainerFactory containerFactory(ConnectionFactory connectionFactory, PlatformTransactionManager transactionManager){
SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
containerFactory.setConnectionFactory(connectionFactory);
containerFactory.setConcurrentConsumers(5);
containerFactory.setAcknowledgeMode(AcknowledgeMode.AUTO);
containerFactory.setTransactionManager(transactionManager);
containerFactory.setMessageConverter(messageConverterAmqp());
containerFactory.setDefaultRequeueRejected(false);
return new TxRabbitListenerContainerFactory(containerFactory);
}
where transactionManager is JpaTransactionManager for transaction on postgre db. TxRabbitListenerContainerFactory is my factory which set setAlwaysRequeueWithTxManagerRollback to false:
public class TxRabbitListenerContainerFactory implements RabbitListenerContainerFactory {
private SimpleRabbitListenerContainerFactory factory;
public TxRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactory factory) {
this.factory = factory;
}
@Override
public MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint) {
SimpleMessageListenerContainer container = factory.createListenerContainer(endpoint);
container.setAlwaysRequeueWithTxManagerRollback(false);
return container;
}
}
Now I have listner like:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "topic.two", durable = "true"),
exchange = @Exchange(value = "topic.def", type = "topic", durable = "true"),
key = "letter.*"
), errorHandler = "rabErrorHandler", containerFactory = "defContainer")
@Transactional
public Motorcycle topicLetters(Motorcycle motorcycle) throws Exception{
motorcycle.setId(UUID.randomUUID().toString());
Testing testing = new Testing();
testingRepository.save(testing);
throwEx();
return motorcycle;
}
where method throwEx(); throw unchecked exception. Data from DB are properly rollbacked (not commited), but message are constantly requeued, see it in listener:
@Bean
public RabbitListenerErrorHandler rabErrorHandler(){
return new RabbitListenerErrorHandler() {
@Override
public Object handleError(Message message, org.springframework.messaging.Message<?> message1, ListenerExecutionFailedException e) throws Exception {
System.out.println("FFFFFFFFFFF");
return null;
}
};
}
How to prevvent infinite loope, and why is it happend ?
EDIT:
Logs: pasted logs
Upvotes: 2
Views: 2462
Reputation: 1081
Found issue: Reason: caused by errorHandler handler which was specified on listener level. In some cases error handler return null - which was causing infinite loop (instead rethrowing exception and rollback transaction)
Upvotes: 0
Reputation: 174719
Set defaultRequeueRejected
to false
on the container factory.
To programmatically decide when to requeue or nor, leave that at true
and throw an AmqpRejectAndDontRequeueException
when you don't want it requeued.
EDIT
There's something not adding up...
protected void prepareHolderForRollback(RabbitResourceHolder resourceHolder, RuntimeException exception) {
if (resourceHolder != null) {
resourceHolder.setRequeueOnRollback(isAlwaysRequeueWithTxManagerRollback() ||
RabbitUtils.shouldRequeue(isDefaultRequeueRejected(), exception, logger));
}
}
If both booleans are false, we don't requeue.
Upvotes: 1