Chandan
Chandan

Reputation: 343

Auto Delete Messages from Queue Once consumed using Spring AMQP

I am having 2 applications exchanging data using RabbitMQ. I have implemented this using Spring AMQP. I have scenario once the message has been consumed from the consumer might encounter an exception while processing.

If any exception comes i am planning to log into the database. I have to remove message from the queue explicitly once the message reaches the consumer whether it is successful processing or error encountered.

How to forcefully remove the message from queue otherwise it will be there if my application fails to process it?

Below is my Listener code

 @RabbitListener(containerFactory="rabbitListenerContainerFactory",queues=Constants.JOB_QUEUE)
            public void handleMessage(JobListenerDTO jobListenerDTO) {
                //System.out.println("Received summary: " + jobListenerDTO.getProcessXML());
                //amqpAdmin.purgeQueue(Constants.JOB_QUEUE, true);
                try{
                    Map<String, Object> variables = new HashMap<String, Object>();  
                    variables.put("initiator", "cmy5kor");

                    Deployment deploy = repositoryService.createDeployment().addString(jobListenerDTO.getProcessId()+".bpmn20.xml",jobListenerDTO.getProcessXML()).deploy();
                    ProcessInstance processInstance = runtimeService.startProcessInstanceByKey(jobListenerDTO.getProcessId(), variables);

                    System.out.println("Process Instance is:::::::::::::"+processInstance);

                }catch(Exception e){

                    e.printStackTrace();
            }

Configuration Code

@Configuration
@EnableRabbit
public class RabbitMQJobConfiguration extends AbstractBipRabbitConfiguration {


    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setQueue(Constants.JOB_QUEUE);
        template.setMessageConverter(jsonMessageConverter());
        return template;
    }


    @Bean
    public Queue jobQueue() {
        return new Queue(Constants.JOB_QUEUE);
    }


    @Bean(name="rabbitListenerContainerFactory")
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
        DefaultClassMapper classMapper = new DefaultClassMapper();
        Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
        idClassMapping.put("com.bosch.diff.approach.TaskMessage", JobListenerDTO.class);
        classMapper.setIdClassMapping(idClassMapping);
        messageConverter.setClassMapper(classMapper);
        factory.setMessageConverter(messageConverter);
        factory.setReceiveTimeout(10L);
        return factory;
    }



}

Upvotes: 2

Views: 4529

Answers (2)

Gary Russell
Gary Russell

Reputation: 174504

As long as your listener catches the exception the message will be removed from the queue.

If your listener throws an exception, it will be requeued by default; that behavior can be modified by throwing a AmqpRejectAndDontRequeueException or setting the defaultRequeueRejected property - see the documentation for details.

Upvotes: 1

cantSleepNow
cantSleepNow

Reputation: 10170

I don't know about spring api or configuration for rmq but this

 I have to remove message from the queue explicitly once the message reaches the consumer whether it is successful processing or error encountered.

is exactly what is happening when you set the auto-acknowledge flag. In that way, the message is acknowledged as soon as it's consumed - so gone from the queue.

Upvotes: 3

Related Questions