abullor
abullor

Reputation: 53

RabbitMQ delayed message when there is an exception

I'm using the plugin (https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq) and it works well. I send a message with X seconds of delay, and it's processed with X seconds of delay.

The problem is in the process logic. If it works well (happy path) I don't have issues. But if the process fails, what I expect is that the message is requeued to be processed with the same delay, but it is processed inmediately.

Is there a way to requeue the message automatically with the original specified delay in case of an exception?

Upvotes: 5

Views: 2909

Answers (1)

Gary Russell
Gary Russell

Reputation: 174494

No; the delay is not applicable for retries; if message order is not important, you can re-publish the message to the tail of the queue.

Or, you can configure a retry interceptor with a fixed back off.

https://docs.spring.io/spring-amqp/docs/current/reference/html/#retry

Spring Retry provides a couple of AOP interceptors and a great deal of flexibility to specify the parameters of the retry (number of attempts, exception types, backoff algorithm, and others). Spring AMQP also provides some convenience factory beans for creating Spring Retry interceptors in a convenient form for AMQP use cases, with strongly typed callback interfaces that you can use to implement custom recovery logic. ...

EDIT

Using a RabbitTemplate instead of a message-driven listener:

@SpringBootApplication
@EnableScheduling
@EnableTransactionManagement
public class So69020120Application {

    public static void main(String[] args) {
        SpringApplication.run(So69020120Application.class, args);
    }

    @Autowired
    Processor processor;

    @Scheduled(fixedDelay = 5000)
    public void sched() {
        try {
            while (this.processor.process()) {
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Bean
    RabbitTransactionManager transactionManager(ConnectionFactory cf) {
        return new RabbitTransactionManager(cf);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            IntStream.range(0,  4).forEach(i -> template.convertAndSend("queue", "good"));
            template.convertAndSend("queue", "fail");
            IntStream.range(0,  4).forEach(i -> template.convertAndSend("queue", "good"));
        };
    }

    @Bean
    Queue queue() {
        return new Queue("queue");
    }

}

@Component
class Processor {

    private final RabbitTemplate template;

    private final AtomicBoolean fail = new AtomicBoolean(true);

    Processor(RabbitTemplate template) {
        this.template = template;
        template.setChannelTransacted(true);
    }

    @Transactional
    public boolean process() {
        String data = (String) template.receiveAndConvert("queue");
        if (data == null) {
            System.out.println("No More Messages");
            return false;
        }
        System.out.println(data);
        if (data.equals("fail") && this.fail.getAndSet(false)) {
            throw new RuntimeException("test");
        }
        return true;
    }

}
good
good
good
good
fail
java.lang.RuntimeException: test
    at com.example.demo.Processor.process(So69020120Application.java:86)
    at com.example.demo.Processor$$FastClassBySpringCGLIB$$6adeaa38.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
    at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
    at com.example.demo.Processor$$EnhancerBySpringCGLIB$$bad30db1.process(<generated>)
    at com.example.demo.So69020120Application.sched(So69020120Application.java:36)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
fail
good
good
good
good
No More Messages

Upvotes: 2

Related Questions