solarein
solarein

Reputation: 557

Spring AMQP single consumer parallelism with prefetch

We have a project that's using Spring-AMQP to consume messages from our RabbitMQ broker. We would like to increase the concurrency on the consuming side so that multiple worker threads can process messages in parallel. I began by reading the documentation for the native RabbitMQ client, and that lead me to a design of using a single consumer, and a prefetch count > 1 to control for parallelism. Using the RabbitMQ client directly, this seems quite natural. The handleDelivery method of the DefaultConsumer can spawn a new Runnable that does the work and acknowledges the message at the end of the work. The prefetch count effectively controls the maximum number of Runnables that the consumer spawns.

However, this design does not seem translatable to the Spring-AMQP world. In the SimpleMessageListenerContainer, for each AMQP consumer, all messages get delivered into a single BlockingQueueConsumer, and a single thread delivers messages from the BlockingQueueConsumer's blocking queue to the MessageListener. Even though the SimpleMessageListenerContainer supports a TaskExecutor, the TaskExecutor is only used to run one task per consumer. So to process multiple messages in parallel one has to use multiple consumers.

This leads me to a few questions about parallelism with Spring-AMQP. First, is my initial design with single consumer and high prefetch count a valid way to achieve parallelism with AMQP? If so, why does Spring-AMQP eschew this design in favor of a thread-per-consumer design? And is it possible to customize Spring-AMQP to make single-consumer parallelism possible?

Upvotes: 1

Views: 2171

Answers (1)

Gary Russell
Gary Russell

Reputation: 174554

Spring AMQP was designed against an earlier version of the rabbit client library which only had one thread per connection.

The handleDelivery method of the DefaultConsumer can spawn a new Runnable that does the work and acknowledges the message at the end of the work.

That doesn't really buy you a lot more than simply increasing the concurrentConsumers - the only difference is there's a consumer for each thread, but there's not a whole lot of overhead there.

You can do what you want, however, using a ChannelAwareMessageListener and set the acknowledgeMode to MANUAL - then your listener is responsible for acking the message(s).

In 2.0 (next year) we will have an alternative listener container that will call the listener directly on the client library thread. But, it's a fair amount of work. This (closed) pull request has an initial PoC but it's not a fully featured container yet.

Upvotes: 2

Related Questions