Reputation: 511
I am processing a high volume stream ~ 500+ msgs per second, The data is consumed off Spring AMQP+Rabbit using a SimpleMessageListenerContainer with 10 concurrent consumers, I have to do some checks on the Db every 15 mins and reload certain properties for processing, this is done with a quartz trigger which fires every 15 mins, stops the SimplelistenerContainer, does the necessary work and starts the Container once again.
Everything works perfectly when the app starts up, when the trigger fires and the Container restarts, I see the same message being delivered multiple times,this causes a lot of duplicates. There are no exeptions thrown by the consumers.
The Message Listener
class RoundRobinQueueListener implements MessageListener {
@Override
public void onMessage(Message message) { //do processing
}
}
During app startup set up parallel consumers and start the consumer
final SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
RoundRobinQueueListener roundRobinListener = RoundRobinQueueListener.class.newInstance();
messageListenerContainer.setQueueNames(queueName);
messageListenerContainer.setMessageListener(roundRobinListener);
messageListenerContainer.setConcurrentConsumers(10);
messageListenerContainer.setChannelTransacted(true);
The quartz trigger
void execute(JobExecutionContext context) throws JobExecutionException {
messageListenerContainer.stop()
//Do db task, other processing
messageListenerContainer.start()
}
Upvotes: 3
Views: 3389
Reputation: 1133
Looks like your messages are now acknowledged by the consumer. If you are not using auto acknowledge mode, you need to acknowledge the message by yourself (This can also be configured at the SimpleMessageListenerContainer). Otherwise, the broker presumes that the message was not processed successfully and tries to deliver it again.
Upvotes: 1