Reputation: 158
I have setup dlq and dlx, but failed message is not redirecting to dlq. I am trying to send message from java app as well as from rabbitmq server to MESSAGES.EXCHANGE, in both case i am getting message but after throwing the exception message should redirect to DLX.MESSAGES.EXCHANGE but it is happening.
Below is java code and screen shot of rabbitmq serer. everything looks right to me. could not find any problem in code or in rabbitmq server.
Queue setup code -
public class DLQAmqpConfiguration {
public static final String DLX_MESSAGES_EXCHANGE = "DLX.MESSAGES.EXCHANGE";
public static final String DLQ_MESSAGES_QUEUE = "DLQ.MESSAGES.QUEUE";
public static final String MESSAGES_QUEUE = "MESSAGES.QUEUE";
public static final String MESSAGES_EXCHANGE = "MESSAGES.EXCHANGE";
public static final String ROUTING_KEY_MESSAGES_QUEUE = "ROUTING_KEY_MESSAGES_QUEUE";
@Bean
Queue messagesQueue() {
return QueueBuilder.durable(MESSAGES_QUEUE)
.withArgument("x-dead-letter-exchange", DLX_MESSAGES_EXCHANGE)
.build();
}
@Bean
DirectExchange messagesExchange() {
return new DirectExchange(MESSAGES_EXCHANGE);
}
@Bean
Binding bindingMessages() {
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(ROUTING_KEY_MESSAGES_QUEUE);
}
@Bean
FanoutExchange deadLetterExchange() {
return new FanoutExchange(DLX_MESSAGES_EXCHANGE);
}
@Bean
Queue deadLetterQueue() {
return QueueBuilder.durable(DLQ_MESSAGES_QUEUE).build();
}
@Bean
Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
}
}
Producer -
this.template.convertAndSend(DLQAmqpConfiguration.MESSAGES_EXCHANGE,
DLQAmqpConfiguration.ROUTING_KEY_MESSAGES_QUEUE, message);
Cunsumer -
@RabbitListener(queues = DLQAmqpConfiguration.MESSAGES_QUEUE)
public void receiveMessage(Message message) throws BusinessException {
System.out.println("Received failed message, re-queueing: " + message.toString());
System.out.println("Received failed message, re-queueing: " + message.getMessageProperties().getReceivedRoutingKey());
throw new BusinessException();
}
// this code never running
@RabbitListener(queues = DLQAmqpConfiguration.DLQ_MESSAGES_QUEUE)
public void processFailedMessages(Message message) {
System.out.println("Received failed message: " + message.toString());
}
QUEUE -
Logs -
Received failed message, re-queueing: (Body:'[B@55c36bc9(byte[26])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=PERSISTENT, redelivered=true, receivedExchange=MESSAGES.EXCHANGE, receivedRoutingKey=ROUTING_KEY_MESSAGES_QUEUE, deliveryTag=5444, consumerTag=amq.ctag-KrxkDPlc_uoqHOx_bbnvnA, consumerQueue=MESSAGES.QUEUE])
Received failed message, re-queueing: ROUTING_KEY_MESSAGES_QUEUE
2020-08-27 21:36:33.460 WARN 13192 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.example.rabbitmq.RabbitmqApplication.receiveMessage(org.springframework.amqp.core.Message) throws com.example.rabbitmq.errorhandler.BusinessException' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:228) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:148) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:133) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:970) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:916) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1291) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1197) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
Caused by: com.example.rabbitmq.errorhandler.BusinessException: null
Upvotes: 2
Views: 4421
Reputation: 174554
You have to set spring.rabbitmq.listener.simple.default-requeue-rejected=false
(or ...direct...
if using the direct container instead of the simple container) or throw AmqpRejectAndDontRequeueException
.
Otherwise, the failed message will be requeued and redelivered.
@SpringBootApplication
public class So63620066Application {
public static void main(String[] args) {
SpringApplication.run(So63620066Application.class, args);
}
public static final String DLX_MESSAGES_EXCHANGE = "DLX.MESSAGES.EXCHANGE";
public static final String DLQ_MESSAGES_QUEUE = "DLQ.MESSAGES.QUEUE";
public static final String MESSAGES_QUEUE = "MESSAGES.QUEUE";
public static final String MESSAGES_EXCHANGE = "MESSAGES.EXCHANGE";
public static final String ROUTING_KEY_MESSAGES_QUEUE = "ROUTING_KEY_MESSAGES_QUEUE";
@Bean
Queue messagesQueue() {
return QueueBuilder.durable(MESSAGES_QUEUE)
.withArgument("x-dead-letter-exchange", DLX_MESSAGES_EXCHANGE)
.build();
}
@Bean
DirectExchange messagesExchange() {
return new DirectExchange(MESSAGES_EXCHANGE);
}
@Bean
Binding bindingMessages() {
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(ROUTING_KEY_MESSAGES_QUEUE);
}
@Bean
FanoutExchange deadLetterExchange() {
return new FanoutExchange(DLX_MESSAGES_EXCHANGE);
}
@Bean
Queue deadLetterQueue() {
return QueueBuilder.durable(DLQ_MESSAGES_QUEUE).build();
}
@Bean
Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
}
@RabbitListener(queues = MESSAGES_QUEUE)
public void receiveMessage(Message message) {
System.out.println("Received failed message, re-queueing: " + message.toString());
System.out.println(
"Received failed message, re-queueing: " + message.getMessageProperties().getReceivedRoutingKey());
throw new RuntimeException("fail");
}
@RabbitListener(queues = DLQ_MESSAGES_QUEUE)
public void processFailedMessages(Message message) {
System.out.println("Received failed message: " + message.toString());
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
template.convertAndSend(MESSAGES_EXCHANGE,
ROUTING_KEY_MESSAGES_QUEUE, "foo");
};
}
}
spring.rabbitmq.listener.simple.default-requeue-rejected=false
Received failed message, re-queueing: ROUTING_KEY_MESSAGES_QUEUE
2020-08-27 12:49:41.056 WARN 11489 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.example.demo.So63620066Application.receiveMessage(org.springframework.amqp.core.Message)' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:228) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:148) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:133) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:970) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:916) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1291) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1197) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]
Caused by: java.lang.RuntimeException: fail
at com.example.demo.So63620066Application.receiveMessage(So63620066Application.java:71) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_212]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_212]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_212]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_212]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) ~[spring-messaging-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:53) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:220) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
... 13 common frames omitted
Received failed message: (Body:'foo' MessageProperties [headers={x-first-death-exchange=MESSAGES.EXCHANGE, x-death=[{reason=rejected, count=1, exchange=MESSAGES.EXCHANGE, time=Thu Aug 27 12:49:41 EDT 2020, routing-keys=[ROUTING_KEY_MESSAGES_QUEUE], queue=MESSAGES.QUEUE}], x-first-death-reason=rejected, x-first-death-queue=MESSAGES.QUEUE}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=DLX.MESSAGES.EXCHANGE, receivedRoutingKey=ROUTING_KEY_MESSAGES_QUEUE, deliveryTag=3, consumerTag=amq.ctag--VIXT0V3hhBrlfTFqI5uxg, consumerQueue=DLQ.MESSAGES.QUEUE])
Upvotes: 7