Per Steffensen
Per Steffensen

Reputation: 633

Spring handling RabbitMQ messages concurrently

I am fairly new to message-handling with Spring, so bear with me.

I would like my RabbitMQ message-handler to handle messages concurrently in several threads.

@Component
public class ConsumerService {

    @RabbitListener(queues = {"q"})
    public void messageHandler(@Payload M msg) {
        System.out.println(msg);        
    }

}

...

@Configuration
@Import({MessageConverterConfiguration.class, ConsumerService.class})
public class ConsumerConfiguration {

    @Autowired
    private ConnectionFactory connectionFactory;

    @Bean
    public List<Declarable> declarations() {
        return Arrays.asList(
            new DirectExchange("e", true, false),
            new Queue("q", true, false, false),
            new Binding("q", Binding.DestinationType.QUEUE, "e", "q", null)
        );
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(MessageConverter contentTypeConverter, SimpleRabbitListenerContainerFactoryConfigurer configurer) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConcurrentConsumers(10);
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(contentTypeConverter);
        return factory;
    }
}

In my small test there are 4 messages on queue "q". I get to process them all. That is fine. But I get to process them one by one. If I set a breakpoint in "ConsumerService.messageHandler" (essentially delaying the completion of handling a message) I would like to end up having 4 threads in that breakpoint. But I never have more than one thread. As soon as I let it run to complete handling of a message, the next message gets to be handled. What do I need to do to handle the messages concurrently?

Upvotes: 3

Views: 4727

Answers (2)

Per Steffensen
Per Steffensen

Reputation: 633

Sorry, I forgot to write that I got it working. Essentially what I have now is:

...
factory.setConcurrentConsumers(10);
factory.setMaxConcurrentConsumers(20);
factory.setConsecutiveActiveTrigger(1);
factory.setConsecutiveIdleTrigger(1);
factory.setPrefetchCount(100);
...

I do believe with concurrentConsumers alone it will actually eventually (under enough load) handle messages in parallel. Problem was that I had only 4 messages in my little test, and it will never bother to activate more than one consumer(-thread) for that. Setting consecutiveActiveTrigger to 1 helps here. Guess prefetchCount also has something to say. Anyway, case closed.

Upvotes: 4

kleash
kleash

Reputation: 1319

There are two ways of achieving this

  • Either use a threadpool to handle messae processing at your consumer.
  • Or, create multiple consumer.

I saw you are using concurrentConsumers property to automatically handling of creating multiple consumers by Spring AMQP. Try setting the PrefetchCount to 1 and set MaxConcurrentConsumers also.

Most probably you already have four messages in queues and as default value of Prefetch Count is large only one consumer is consuming all the messages present on queue.

Upvotes: 3

Related Questions