Reputation: 633
I am fairly new to message-handling with Spring, so bear with me.
I would like my RabbitMQ message-handler to handle messages concurrently in several threads.
@Component
public class ConsumerService {
@RabbitListener(queues = {"q"})
public void messageHandler(@Payload M msg) {
System.out.println(msg);
}
}
...
@Configuration
@Import({MessageConverterConfiguration.class, ConsumerService.class})
public class ConsumerConfiguration {
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public List<Declarable> declarations() {
return Arrays.asList(
new DirectExchange("e", true, false),
new Queue("q", true, false, false),
new Binding("q", Binding.DestinationType.QUEUE, "e", "q", null)
);
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(MessageConverter contentTypeConverter, SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConcurrentConsumers(10);
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(contentTypeConverter);
return factory;
}
}
In my small test there are 4 messages on queue "q". I get to process them all. That is fine. But I get to process them one by one. If I set a breakpoint in "ConsumerService.messageHandler" (essentially delaying the completion of handling a message) I would like to end up having 4 threads in that breakpoint. But I never have more than one thread. As soon as I let it run to complete handling of a message, the next message gets to be handled. What do I need to do to handle the messages concurrently?
Upvotes: 3
Views: 4727
Reputation: 633
Sorry, I forgot to write that I got it working. Essentially what I have now is:
...
factory.setConcurrentConsumers(10);
factory.setMaxConcurrentConsumers(20);
factory.setConsecutiveActiveTrigger(1);
factory.setConsecutiveIdleTrigger(1);
factory.setPrefetchCount(100);
...
I do believe with concurrentConsumers alone it will actually eventually (under enough load) handle messages in parallel. Problem was that I had only 4 messages in my little test, and it will never bother to activate more than one consumer(-thread) for that. Setting consecutiveActiveTrigger to 1 helps here. Guess prefetchCount also has something to say. Anyway, case closed.
Upvotes: 4
Reputation: 1319
There are two ways of achieving this
I saw you are using concurrentConsumers property to automatically handling of creating multiple consumers by Spring AMQP. Try setting the PrefetchCount to 1 and set MaxConcurrentConsumers also.
Most probably you already have four messages in queues and as default value of Prefetch Count is large only one consumer is consuming all the messages present on queue.
Upvotes: 3