Reputation: 53
I'm using the plugin (https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq) and it works well. I send a message with X seconds of delay, and it's processed with X seconds of delay.
The problem is in the process logic. If it works well (happy path) I don't have issues. But if the process fails, what I expect is that the message is requeued to be processed with the same delay, but it is processed inmediately.
Is there a way to requeue the message automatically with the original specified delay in case of an exception?
Upvotes: 5
Views: 2909
Reputation: 174494
No; the delay is not applicable for retries; if message order is not important, you can re-publish the message to the tail of the queue.
Or, you can configure a retry interceptor with a fixed back off.
https://docs.spring.io/spring-amqp/docs/current/reference/html/#retry
Spring Retry provides a couple of AOP interceptors and a great deal of flexibility to specify the parameters of the retry (number of attempts, exception types, backoff algorithm, and others). Spring AMQP also provides some convenience factory beans for creating Spring Retry interceptors in a convenient form for AMQP use cases, with strongly typed callback interfaces that you can use to implement custom recovery logic. ...
EDIT
Using a RabbitTemplate
instead of a message-driven listener:
@SpringBootApplication
@EnableScheduling
@EnableTransactionManagement
public class So69020120Application {
public static void main(String[] args) {
SpringApplication.run(So69020120Application.class, args);
}
@Autowired
Processor processor;
@Scheduled(fixedDelay = 5000)
public void sched() {
try {
while (this.processor.process()) {
}
}
catch (Exception e) {
e.printStackTrace();
}
}
@Bean
RabbitTransactionManager transactionManager(ConnectionFactory cf) {
return new RabbitTransactionManager(cf);
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
IntStream.range(0, 4).forEach(i -> template.convertAndSend("queue", "good"));
template.convertAndSend("queue", "fail");
IntStream.range(0, 4).forEach(i -> template.convertAndSend("queue", "good"));
};
}
@Bean
Queue queue() {
return new Queue("queue");
}
}
@Component
class Processor {
private final RabbitTemplate template;
private final AtomicBoolean fail = new AtomicBoolean(true);
Processor(RabbitTemplate template) {
this.template = template;
template.setChannelTransacted(true);
}
@Transactional
public boolean process() {
String data = (String) template.receiveAndConvert("queue");
if (data == null) {
System.out.println("No More Messages");
return false;
}
System.out.println(data);
if (data.equals("fail") && this.fail.getAndSet(false)) {
throw new RuntimeException("test");
}
return true;
}
}
good
good
good
good
fail
java.lang.RuntimeException: test
at com.example.demo.Processor.process(So69020120Application.java:86)
at com.example.demo.Processor$$FastClassBySpringCGLIB$$6adeaa38.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
at com.example.demo.Processor$$EnhancerBySpringCGLIB$$bad30db1.process(<generated>)
at com.example.demo.So69020120Application.sched(So69020120Application.java:36)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
fail
good
good
good
good
No More Messages
Upvotes: 2