user1554876
user1554876

Reputation: 324

JMS Configuring backoff/retry without blocking onMessage()

javax.JMS version 2.0.1

Provider : ibm.mq v9.0

Framework : Java Spring boot

From what I know, onMessage() is asynchronous. I am successfully retrying the message send. However, the re-sending of messages happens instantaneously after a message failure. Ideally I want the retry to happen in a sliding window style eg. First retry after 20 seconds, second retry after 40 etc.

How can I achieve this without a Thread.Sleep() which, I presume, will block the entire Java thread and is not something I want at all ?

Code is something like this

final int TIME_TO_WAIT = 20;

public void onMessage(Message , message)
{
   :
   :
   int t  =   message.getIntProperty("JMSXDeliveryCount");
   if(t > 1)
   {
     // Figure out a way to wait for (TIME_TO_WAIT * t)
   
   }

}
catch(Exception e)
{
    // Do some logging/cleanup etc.
    throw new RunimeException(e);// this causes a message retry
}

Upvotes: 0

Views: 3818

Answers (1)

sonus21
sonus21

Reputation: 5388

I would suggest you use exponential backoff in the retry logic, but you would need to use the Delivery Delay feature.

Define a custom JmsTemplate that will use delay property from the message, you should add retry count in the message property as well so that you can delay as per your need like 20, 40, 80, 160, etc

public class DelayedJmsTemplate extends JmsTemplate {
    public static String DELAY_PROPERTY_NAME = "deliveryDelay";
        
    @Override
    protected void doSend(MessageProducer producer, Message message) throws JMSException {
        long delay = -1;
        if (message.propertyExists(DELAY_PROPERTY_NAME)) {
            delay = message.getLongProperty(DELAY_PROPERTY_NAME);
        }
        if (delay >= 0) {
            producer.setDeliveryDelay(delay);
        }
        if (isExplicitQosEnabled()) {
            producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive());
        } else {
            producer.send(message);
        }
    }
}

Define Components, that will have the capability fo re-enqueue of the message, you can define this interface in the base message listener. The handleException method should do all the tasks of enqueue and computing delay etc. You may not always interested in enqueuing, in some cases, you would discard messages as well.

You can see a similar post-processing logic here https://github.com/sonus21/rqueue/blob/4c9c5c88f02e5cf0ac4b16129fe5b880411d7afc/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/PostProcessingHandler.java

@Component
@Sl4j
public class MessageListener {
    private final JmsTemplate jmsTemplate;
    
    @Autowired
    public MessageListener(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }
        
    @JmsListener(destination = "myDestination")
    public void onMessage(Message message) throws JMSException {
        try {
            // do something
        } catch (Exception e) {
            handleException("myDestination", message, e);
        }
    }
    
    // Decide whether the message should be ignored due to many retries etc
    private boolean shouldBeIgnored(String destination, Message message) {
        return false;
    }
    
    // add logic to compute delay
    private long getDelay(String destination, Message message, int deliveryCount) {
        return 100L;
    }
    
    private void handleException(String destination, Message message, Exception e) throws JMSException {
        if (shouldBeIgnored(destination, message)) {
            log.info("destination: {}, message: {} is ignored ", destination, message, e);
            return;
        }
        if (message.propertyExists("JMSXDeliveryCount")) {
            int t = message.getIntProperty("JMSXDeliveryCount");
            long delay = getDelay(destination, message, t + 1);

            message.setLongProperty(DELAY_PROPERTY_NAME, delay);
            message.setIntProperty("JMSXDeliveryCount", t + 1);
            jmsTemplate.send(destination, session -> message);
        } else {
            // no delivery count, is this the first message or should be ignored?
        }
    }
}

Upvotes: 1

Related Questions