Chandan
Chandan

Reputation: 353

Message Retry policies in Spring AMQP

I am using spring-amqp to consume messages from RabbitMQ in my web application. Web application consists of multiple components in it such as (Redis, OracleDB)

Now i have a scenario, if any exception occurs due to infrastructure like Oracle server is down, Redis connection issue, i want to push message back to the same queue and after certain specified delay i want to consume the message back.

After certain delay then also the message is leading to same exception, probably i want to use maximum attempts option or do the same as above push the message back to queue and send a mail to administrator stating "Infrastructure Issue".

Does Spring AMQP supports above scenario.? If yes please provide me how to come up with such or similar solutions.

I tried below piece of code. Message is not going for dead letter queue instead it is re-queuing to same queue causing infinite loop. Please correct me where am i going wrong

Configuration class

    @Configuration
public class MQConfig {

    public static final String OUTGOING_QUEUE = "my.outgoing.example";

    public static final String INCOMING_QUEUE = "my.incoming.example";

    public static final String DEAD_LETTER_QUEUE = "my.deadletter.queue.example";

    @Autowired
    private ConnectionFactory cachingConnectionFactory;

    // Setting the annotation listeners to use the jackson2JsonMessageConverter
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(cachingConnectionFactory);
        factory.setMessageConverter(jackson2JsonMessageConverter());
        factory.setDefaultRequeueRejected(false);
        return factory;
    }

    // Standardize on a single objectMapper for all message queue items
    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public Queue outgoingQueue() {
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-dead-letter-exchange", "dlx");
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE);
        args.put("x-message-ttl", 50000);
        return new Queue(OUTGOING_QUEUE, false, false, false, args);
    }

    @Bean
    public RabbitTemplate outgoingSender() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
        rabbitTemplate.setQueue(outgoingQueue().getName());
        // rabbitTemplate.setRoutingKey(outgoingQueue().getName());
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public Queue incomingQueue() {
        return new Queue(INCOMING_QUEUE);
    }

    @Bean
    public Queue deadLetterQueue() {
        return new Queue(DEAD_LETTER_QUEUE);
    }

    @Bean
    public DirectExchange dlx() {
        return new DirectExchange(DEAD_LETTER_QUEUE);
    }

    @Bean
    public Binding dlqBinding() {
        return BindingBuilder.bind(deadLetterQueue()).to(dlx()).with(DEAD_LETTER_QUEUE);
    }

}

Core logic

@Component
public class DeadLetterSendReceive {

  private static final Logger LOGGER = LoggerFactory.getLogger(DeadLetterSendReceive.class);

  @Autowired
  private RabbitTemplate outgoingSender;

  // Scheduled task to send an object every 5 seconds
  @Scheduled(fixedDelay = 5000)
  public void sender() {

    Integer int1[] = new Integer[]{10,20,30,40,50};  
    for (int i = 0; i <= int1.length; i++){
        System.out.println(int1[i]);

        if(int1[i]/10 == 1){
            throw new AmqpRejectAndDontRequeueException("to deadletter queue");
        }
        else{
            ExampleObject ex = new ExampleObject();
            ex.setValue(int1[i]);
             LOGGER.info("Sending example object at " + ex.getValue());
             outgoingSender.convertAndSend(ex);
        }

    }
  }

  // Annotation to listen for an ExampleObject
  @RabbitListener(queues = MQConfig.INCOMING_QUEUE)
  public void handleMessage(ExampleObject exampleObject) {
    LOGGER.info("Received incoming object at " + exampleObject.getValue());
  }

}

Pojo Class

import java.util.Date;

public class ExampleObject {

  private Date date = new Date();
  private int value;

  public int getValue() {
    return value;
}

public void setValue(int value) {
    this.value = value;
}

public ExampleObject() {
  }

  @Override
  public String toString() {
    return "ExampleObject{" +
        "date= " + date +
        '}';
  }

  public Date getDate() {
    return date;
  }

  public void setDate(Date date) {
    this.date = date;
  }

}

Upvotes: 2

Views: 3304

Answers (1)

Gary Russell
Gary Russell

Reputation: 174809

There are a couple of ways to do it; use the delayed message exchange plugin and publish the failed message to it. You can set a header to track how many attempts have been made.

Or you can do it with a dead letter queue with a TTL where the dead-letter queue is configured with dead-lettering to send the expired message back to the original queue. See my answer to this question and its link to another answer.

You can use the x-death header to track retries; it has been changed in recent brokers to now keep a count instead of keep adding new entries to the header.

To force the message to go to the DLQ, set defaultRequeueRejected to false or throw an AmqpRejectAndDontRequeueException.

Upvotes: 1

Related Questions