TechCoze
TechCoze

Reputation: 501

Disable DLQ and re-delivery for ActivemMQ messages

I am developing system where messages to be processed are pushed in ActiveMQ. I have stringent requirement that consumer must process messages in incoming sequence. If a message processing fails in consumer, it need to rollback/recover and keep on re-trying infinitely. Only when a message processing is successful, consumer need to commit and proceed to next message.

How to prevent rolled-back message auto-forwarding to DLQ and what's proper way of configuring re-delivery policy for such requirement?

Upvotes: 2

Views: 1420

Answers (1)

Hassen Bennour
Hassen Bennour

Reputation: 3913

when set RedeliveryPolicy re-trying infinitely like below the messages will never be sent to DLQ.

policy.setMaximumRedeliveries(RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES);

with ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE you acknowledge messages one by one.

http://activemq.apache.org/redelivery-policy.html

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.RedeliveryPolicy;

public class SimpleConsumerIndividualAcknowledge {

    public static void main(String[] args) throws JMSException {
        Connection conn = null;
        try {
            ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
            RedeliveryPolicy policy = new RedeliveryPolicy();
            policy.setMaximumRedeliveries(RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES);
            cf.setRedeliveryPolicy(policy);
            conn = cf.createConnection();
            ActiveMQSession session = (ActiveMQSession) conn.createSession(false,
                    ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
            ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session
                    .createConsumer(session.createQueue("test"));
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        //do your stuff
                        message.acknowledge();
                    } catch (Exception e) {
                        throw new RuntimeException(e);//ActiveMQMessageConsumer.rollback() is called automatically
                    }
                }
            });
            conn.start();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (Exception e) {
                }
            }
        }
    }
}

if you want to manually stop and restart consumer take a look here activemq-redelivery-does-not-work

Upvotes: 2

Related Questions