Arun
Arun

Reputation: 158

SpringBoot + Rabbitmq - DLQ queue not working

I have setup dlq and dlx, but failed message is not redirecting to dlq. I am trying to send message from java app as well as from rabbitmq server to MESSAGES.EXCHANGE, in both case i am getting message but after throwing the exception message should redirect to DLX.MESSAGES.EXCHANGE but it is happening.

Below is java code and screen shot of rabbitmq serer. everything looks right to me. could not find any problem in code or in rabbitmq server.

Queue setup code -

public class DLQAmqpConfiguration {
    public static final String DLX_MESSAGES_EXCHANGE = "DLX.MESSAGES.EXCHANGE";
    public static final String DLQ_MESSAGES_QUEUE = "DLQ.MESSAGES.QUEUE";

    public static final String MESSAGES_QUEUE = "MESSAGES.QUEUE";
    public static final String MESSAGES_EXCHANGE = "MESSAGES.EXCHANGE";
    public static final String ROUTING_KEY_MESSAGES_QUEUE = "ROUTING_KEY_MESSAGES_QUEUE";

    @Bean
    Queue messagesQueue() {
        return QueueBuilder.durable(MESSAGES_QUEUE)
                .withArgument("x-dead-letter-exchange", DLX_MESSAGES_EXCHANGE)
                .build();
    }

    @Bean
    DirectExchange messagesExchange() {
        return new DirectExchange(MESSAGES_EXCHANGE);
    }

    @Bean
    Binding bindingMessages() {
        return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(ROUTING_KEY_MESSAGES_QUEUE);
    }

    @Bean
    FanoutExchange deadLetterExchange() {
        return new FanoutExchange(DLX_MESSAGES_EXCHANGE);
    }

    @Bean
    Queue deadLetterQueue() {
        return QueueBuilder.durable(DLQ_MESSAGES_QUEUE).build();
    }

    @Bean
    Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
    }
}

Producer -

    this.template.convertAndSend(DLQAmqpConfiguration.MESSAGES_EXCHANGE,
                DLQAmqpConfiguration.ROUTING_KEY_MESSAGES_QUEUE, message);

Cunsumer -

    @RabbitListener(queues = DLQAmqpConfiguration.MESSAGES_QUEUE)
    public void receiveMessage(Message message) throws BusinessException {
        System.out.println("Received failed message, re-queueing: " + message.toString());
        System.out.println("Received failed message, re-queueing: " + message.getMessageProperties().getReceivedRoutingKey());
        throw new BusinessException();
    }

    // this code never running 
    @RabbitListener(queues = DLQAmqpConfiguration.DLQ_MESSAGES_QUEUE)
    public void processFailedMessages(Message message) {
        System.out.println("Received failed message: " + message.toString());
    }

Exchange - enter image description here

QUEUE -

enter image description here

enter image description here

Logs -

Received failed message, re-queueing: (Body:'[B@55c36bc9(byte[26])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=PERSISTENT, redelivered=true, receivedExchange=MESSAGES.EXCHANGE, receivedRoutingKey=ROUTING_KEY_MESSAGES_QUEUE, deliveryTag=5444, consumerTag=amq.ctag-KrxkDPlc_uoqHOx_bbnvnA, consumerQueue=MESSAGES.QUEUE])
Received failed message, re-queueing: ROUTING_KEY_MESSAGES_QUEUE
2020-08-27 21:36:33.460  WARN 13192 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.example.rabbitmq.RabbitmqApplication.receiveMessage(org.springframework.amqp.core.Message) throws com.example.rabbitmq.errorhandler.BusinessException' threw exception
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:228) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:148) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:133) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:970) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:916) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1291) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1197) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
Caused by: com.example.rabbitmq.errorhandler.BusinessException: null

Upvotes: 2

Views: 4421

Answers (1)

Gary Russell
Gary Russell

Reputation: 174554

You have to set spring.rabbitmq.listener.simple.default-requeue-rejected=false (or ...direct... if using the direct container instead of the simple container) or throw AmqpRejectAndDontRequeueException.

Otherwise, the failed message will be requeued and redelivered.

@SpringBootApplication
public class So63620066Application {

    public static void main(String[] args) {
        SpringApplication.run(So63620066Application.class, args);
    }

    public static final String DLX_MESSAGES_EXCHANGE = "DLX.MESSAGES.EXCHANGE";

    public static final String DLQ_MESSAGES_QUEUE = "DLQ.MESSAGES.QUEUE";

    public static final String MESSAGES_QUEUE = "MESSAGES.QUEUE";

    public static final String MESSAGES_EXCHANGE = "MESSAGES.EXCHANGE";

    public static final String ROUTING_KEY_MESSAGES_QUEUE = "ROUTING_KEY_MESSAGES_QUEUE";

    @Bean
    Queue messagesQueue() {
        return QueueBuilder.durable(MESSAGES_QUEUE)
                .withArgument("x-dead-letter-exchange", DLX_MESSAGES_EXCHANGE)
                .build();
    }

    @Bean
    DirectExchange messagesExchange() {
        return new DirectExchange(MESSAGES_EXCHANGE);
    }

    @Bean
    Binding bindingMessages() {
        return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(ROUTING_KEY_MESSAGES_QUEUE);
    }

    @Bean
    FanoutExchange deadLetterExchange() {
        return new FanoutExchange(DLX_MESSAGES_EXCHANGE);
    }

    @Bean
    Queue deadLetterQueue() {
        return QueueBuilder.durable(DLQ_MESSAGES_QUEUE).build();
    }

    @Bean
    Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
    }

    @RabbitListener(queues = MESSAGES_QUEUE)
    public void receiveMessage(Message message) {
        System.out.println("Received failed message, re-queueing: " + message.toString());
        System.out.println(
                "Received failed message, re-queueing: " + message.getMessageProperties().getReceivedRoutingKey());
        throw new RuntimeException("fail");
    }

    @RabbitListener(queues = DLQ_MESSAGES_QUEUE)
    public void processFailedMessages(Message message) {
        System.out.println("Received failed message: " + message.toString());
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            template.convertAndSend(MESSAGES_EXCHANGE,
                    ROUTING_KEY_MESSAGES_QUEUE, "foo");
        };
    }

}
spring.rabbitmq.listener.simple.default-requeue-rejected=false
Received failed message, re-queueing: ROUTING_KEY_MESSAGES_QUEUE
2020-08-27 12:49:41.056  WARN 11489 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.example.demo.So63620066Application.receiveMessage(org.springframework.amqp.core.Message)' threw exception
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:228) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:148) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:133) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:970) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:916) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1291) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1197) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]
Caused by: java.lang.RuntimeException: fail
    at com.example.demo.So63620066Application.receiveMessage(So63620066Application.java:71) ~[classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_212]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_212]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_212]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_212]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) ~[spring-messaging-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:53) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:220) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    ... 13 common frames omitted

Received failed message: (Body:'foo' MessageProperties [headers={x-first-death-exchange=MESSAGES.EXCHANGE, x-death=[{reason=rejected, count=1, exchange=MESSAGES.EXCHANGE, time=Thu Aug 27 12:49:41 EDT 2020, routing-keys=[ROUTING_KEY_MESSAGES_QUEUE], queue=MESSAGES.QUEUE}], x-first-death-reason=rejected, x-first-death-queue=MESSAGES.QUEUE}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=DLX.MESSAGES.EXCHANGE, receivedRoutingKey=ROUTING_KEY_MESSAGES_QUEUE, deliveryTag=3, consumerTag=amq.ctag--VIXT0V3hhBrlfTFqI5uxg, consumerQueue=DLQ.MESSAGES.QUEUE])

Upvotes: 7

Related Questions