Berigoner
Berigoner

Reputation: 103

Configuring sessionAcknowledgeMode in DefaultMessageListenerContainer

I have a setup where I have to read a message from a queue in an ActiveMQ broker. Once the message is read I have to do a long-running operation on the message.

Due to this long-running operation on the message I want to acknowledge the message as soon as possible so the resources on the broker are released. The plan would be to execute the following steps once a message is received:

  1. Get message from ActiveMQ
  2. Insert message into DB
  3. Acknowledge message
  4. Do some long-running operation with the message

I've read about JMS and the different acknowledge modes, so before even trying to do that I decided to set up an application where I could try the different modes to understand how they are processes, unfortunately I cannot seem to get my desired output.

Following the information in this answer https://stackoverflow.com/a/10188078 as well as https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jms/listener/DefaultMessageListenerContainer.html I thought that by using AUTO_ACKNOWLEDGE the message would be acknowledged before my listener is even called, but if I throw an exception in the listener the message is redelivered.

I've tried both with and without setting the setSessionTransacted to true, but in both cases I get the same output. The message is redelivered when an exception is thrown in the JmsListener.

Configuration of JMS

    @Bean
    public ConnectionFactory connectionFactory() {
        ConnectionFactory connectionFactory =
                new ActiveMQConnectionFactory(jmsConfig.getBrokerUrl());

        return connectionFactory;
    }

    @Bean
    public JmsTemplate jmstemplate(){
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory(connectionFactory());
        //jmsTemplate.setSessionTransacted(true);
        jmsTemplate.setDefaultDestinationName( jmsConfig.getQueueIn() );
        return jmsTemplate;
    }

    @Bean
    public JmsListenerContainerFactory jmsListenerContainerFactoryxxxx(
                    ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {

        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
        //factory.setConcurrency("1");
        factory.setSessionTransacted(true);

        configurer.configure(factory, connectionFactory);
        return factory;
    }

JmsListener

@JmsListener(destination = "B1Q1", containerFactory = "jmsListenerContainerFactoryxxxx")
public  void receiveMessage(Message message) {
    try {
        TextMessage m = (TextMessage) message;
        String messageText = m.getText();

        int retryNum = message.getIntProperty("JMSXDeliveryCount");
        long s = message.getLongProperty("JMSTimestamp");

        Date d = new Date( s );

        String dbText = String.format("Retry %d. Message: %s", retryNum, messageText);

        if ( messageText.toLowerCase().contains("exception") ) {
            logger.info("Creating exception for retry: {}", retryNum);
            throw new RuntimeException();
        }
    } catch (JMSException e) {
        logger.error("Exception!!", e);
    }
}

How should I change the code so that the message is not redelivered when an exception is thrown?

Going back to my application where I would be inserting the message into the DB. How could I acknowledge the message in by JmsListener after the message is inserted in the DB but before executing the long-running task?

Upvotes: 3

Views: 5266

Answers (2)

rekha
rekha

Reputation: 1

          factory.setSessionAcknowledgeMode(Tibjms.EXPLICIT_CLIENT_ACKNOWLEDGE);
          //factory.setSessionTransacted(false);// here it’s  not working
          factory.setTaskExecutor(new SimpleAsyncTaskExecutor("KDBMessageListener-"));
          configurer.configure(factory, connectionFactory);
          factory.setSessionTransacted(false); //post configure ,session transacted is working

Upvotes: 0

Berigoner
Berigoner

Reputation: 103

In order to be able to use AUTO_ACKNOWLEDGE or CLIENT_ACKNOWLEDGE I had to call factory.setSessionTransacted(false) after configuring the factory.

Calling configurer.configure(factory, connectionFactory) overrides the value of sessionTransacted, in my case it was setting it to true which rendered AUTO_ACKNOWLEDGE or CLIENT_ACKNOWLEDGE ineffective. Here's the relevant code of DefaultJmsListenerContainerFactoryConfigurer.java:

public void configure(DefaultJmsListenerContainerFactory factory, ConnectionFactory connectionFactory) {
...
...
        if (this.transactionManager != null) {
            factory.setTransactionManager(this.transactionManager);
        } else {
            factory.setSessionTransacted(true);
        }
...
...

Upvotes: 4

Related Questions