Reputation: 343
I am having 2 applications exchanging data using RabbitMQ. I have implemented this using Spring AMQP. I have scenario once the message has been consumed from the consumer might encounter an exception while processing.
If any exception comes i am planning to log into the database. I have to remove message from the queue explicitly once the message reaches the consumer whether it is successful processing or error encountered.
How to forcefully remove the message from queue otherwise it will be there if my application fails to process it?
Below is my Listener code
@RabbitListener(containerFactory="rabbitListenerContainerFactory",queues=Constants.JOB_QUEUE)
public void handleMessage(JobListenerDTO jobListenerDTO) {
//System.out.println("Received summary: " + jobListenerDTO.getProcessXML());
//amqpAdmin.purgeQueue(Constants.JOB_QUEUE, true);
try{
Map<String, Object> variables = new HashMap<String, Object>();
variables.put("initiator", "cmy5kor");
Deployment deploy = repositoryService.createDeployment().addString(jobListenerDTO.getProcessId()+".bpmn20.xml",jobListenerDTO.getProcessXML()).deploy();
ProcessInstance processInstance = runtimeService.startProcessInstanceByKey(jobListenerDTO.getProcessId(), variables);
System.out.println("Process Instance is:::::::::::::"+processInstance);
}catch(Exception e){
e.printStackTrace();
}
Configuration Code
@Configuration
@EnableRabbit
public class RabbitMQJobConfiguration extends AbstractBipRabbitConfiguration {
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setQueue(Constants.JOB_QUEUE);
template.setMessageConverter(jsonMessageConverter());
return template;
}
@Bean
public Queue jobQueue() {
return new Queue(Constants.JOB_QUEUE);
}
@Bean(name="rabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
DefaultClassMapper classMapper = new DefaultClassMapper();
Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
idClassMapping.put("com.bosch.diff.approach.TaskMessage", JobListenerDTO.class);
classMapper.setIdClassMapping(idClassMapping);
messageConverter.setClassMapper(classMapper);
factory.setMessageConverter(messageConverter);
factory.setReceiveTimeout(10L);
return factory;
}
}
Upvotes: 2
Views: 4529
Reputation: 174504
As long as your listener catches the exception the message will be removed from the queue.
If your listener throws an exception, it will be requeued by default; that behavior can be modified by throwing a AmqpRejectAndDontRequeueException
or setting the defaultRequeueRejected
property - see the documentation for details.
Upvotes: 1
Reputation: 10170
I don't know about spring api or configuration for rmq but this
I have to remove message from the queue explicitly once the message reaches the consumer whether it is successful processing or error encountered.
is exactly what is happening when you set the auto-acknowledge flag. In that way, the message is acknowledged as soon as it's consumed - so gone from the queue.
Upvotes: 3