
Reputation: 25

Spring AMQP - Message re queuing using dead letter mechanism with TTL

Its like "Houston we have a problem here" where I need to schedule/delay a message for 5 minutes after it fails on the first attempt to process an event. I have implemented dead letter exchange in this scenario.

The messages on failing, route to the DLX --> Retry Queue and comes back to work queue after a TTL of 5 minutes for another attempt.

Here is the configuration I am using:

public class RabbitMQConfig {
    @Bean(name = "work")
    Queue workQueue() {
        return new Queue(WORK_QUEUE, true, false, false, null);

    @Bean(name = "workExchange")
    TopicExchange workExchange() {
        return new TopicExchange(WORK_EXCHANGE, true, false);

    Binding workBinding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(workQueue()).to(workExchange()).with("#");

    @Bean(name = "retryExchange")
    FanoutExchange retryExchange() {
        return new FanoutExchange(RETRY_EXCHANGE, true, false);

    @Bean(name = "retry")
    Queue retryQueue() {
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-dead-letter-exchange", WORK_EXCHANGE);
        args.put("x-message-ttl", RETRY_DELAY); //delay of 5 min
        return new Queue(RETRY_QUEUE, true, false, false, args);

    Binding retryBinding(Queue queue,FanoutExchange exchange) {
        return BindingBuilder.bind(retryQueue()).to(retryExchange());

    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        return factory;

    Consumer receiver() {
        return new Consumer();

    MessageListenerAdapter listenerAdapter(Consumer receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");


@GetMapping(path = "/hello")
public String sayHello() {
    // Producer operation

        String messages[];
        messages = new String[] {" hello "};

    for (int i = 0; i < 5; i++) {
        String message = util.getMessage(messages)+i;

        rabbitTemplate.convertAndSend("WorkExchange","", message);
       System.out.println(" Sent '" + message + "'");
    return "hello";


public class Consumer {

    @RabbitListener(queues = "WorkQueue")
    public void receiveMessage(String message, Channel channel,
            @Header(AmqpHeaders.DELIVERY_TAG) Long tag) throws IOException, InterruptedException {

        try {

            System.out.println("message to be processed: " + message);
            channel.basicAck(tag, false);

        } catch (Exception e) {
            System.out.println("In the exception catch block");
            System.out.println("message in dead letter exchange: " + message);
            channel.basicPublish("RetryExchange", "", null, message.getBytes());



    private void doWorkTwo(String task) throws InterruptedException {

        int c = 0;
        int b = 5;
        int d = b / c;



Is it the correct way to use a dead letter exchange for my scenario and after waiting once in the RETRY QUEUE for 5 min, on the second time attempt it does not wait for 5 min in the RETRY QUEUE (I have mentioned TTL as 5 min) and moves to the WORK QUEUE immediately.

I am running this application by hitting localhost:8080/hello url.

Here is my updated configuration.


public class RabbitMQConfig {

    final static String WORK_QUEUE = "WorkQueue";
    final static String RETRY_QUEUE = "RetryQueue";
    final static String WORK_EXCHANGE = "WorkExchange"; // Dead Letter Exchange
    final static String RETRY_EXCHANGE = "RetryExchange";
    final static int RETRY_DELAY = 60000; // in ms (1 min)

    @Bean(name = "work")
    Queue workQueue() {
         Map<String, Object> args = new HashMap<String, Object>();
         args.put("x-dead-letter-exchange", RETRY_EXCHANGE);
        return new Queue(WORK_QUEUE, true, false, false, args);

    @Bean(name = "workExchange")
    DirectExchange workExchange() {
        return new DirectExchange(WORK_EXCHANGE, true, false);

    Binding workBinding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(workQueue()).to(workExchange()).with("");

    @Bean(name = "retryExchange")
    DirectExchange retryExchange() {
        return new DirectExchange(RETRY_EXCHANGE, true, false);

    // Messages will drop off RetryQueue into WorkExchange for re-processing
    // All messages in queue will expire at same rate
    @Bean(name = "retry")
    Queue retryQueue() {
        Map<String, Object> args = new HashMap<String, Object>();
        //args.put("x-dead-letter-exchange", WORK_EXCHANGE);
        //args.put("x-message-ttl", RETRY_DELAY);
        return new Queue(RETRY_QUEUE, true, false, false, null);

    Binding retryBinding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(retryQueue()).to(retryExchange()).with("");

    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        /*factory.setAdviceChain(new Advice[] {
                        .maxAttempts(2).recoverer(new RejectAndDontRequeueRecoverer())
                        .backOffOptions(1000, 2, 5000)
        return factory;

    Consumer receiver() {
        return new Consumer();

    MessageListenerAdapter listenerAdapter(Consumer receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");


public class Consumer {

    @RabbitListener(queues = "WorkQueue")
    public void receiveMessage(String message, Channel channel,
            @Header(AmqpHeaders.DELIVERY_TAG) Long tag,
            @Header(required = false, name = "x-death") HashMap<String, String> xDeath)
            throws IOException, InterruptedException {

        channel.basicAck(tag, false);

    private void doWorkTwo(String task) {
        int c = 0;
        int b = 5;
        if (c < b) {
            throw new AmqpRejectAndDontRequeueException(task);

Upvotes: 2

Views: 6964

Answers (1)

Gary Russell
Gary Russell

Reputation: 174809

If you reject the message so the broker routes it to a DLQ, you can examine the x-death header. In this scenario, I have a DLQ with a TTL of 5 seconds and the consumer of the message from the main queue rejects it; the broker routes it to the DLQ, then it expires and is routed back to the main queue - the x-death header shows the number of re-routing operations:

x-death header

Upvotes: 2

Related Questions