Reputation: 872
I am trying to make consuming messages from a RabbitMQ Queue more efficient.
I am unable to get my Spring configuration to work with RabbitMQ SimpleMessageListenerContainer where I have configured concurrency and prefetch count.
Spring Boot 3.2.5
Spring Cloud 2023.0.1
spring-cloud-stream
spring-cloud-stream-binder-rabbit
Java 21
RabbitMQ Server running version 3.6.12
spring:
application:
name: app-third
rabbitmq:
host: ${RABBITMQ_URL:localhost}
port: ${RABBITMQ_PORT:5672}
virtual-host: ${RABBITMQ_VIRTUALHOST:}
username: ${RABBITMQ_USER:guest}
password: ${RABBITMQ_PASS:guest}
listener:
simple:
retry:
enabled: true
max-attempts: 3
concurrency: 5
max-concurrency: 25
prefetch: 10
cloud:
function:
definition: process
stream:
bindings:
process-in-0:
destination: app.third-request
group: app-third
process-out-0:
destination: app.third-response
group: app-third
threads:
virtual:
enabled: true
My Custom TaskExecutor Bean
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);
executor.setMaxPoolSize(200);
executor.setQueueCapacity(400);
executor.initialize();
return executor;
}
I am able to get this to work only if I create a Custom Bean of SimpleMessageListenerContainer
.
The RabbitMQ server now gets 6 consumers (should be 5).
Five of the consumers have prefetch count 10, while One has prefetch count 1.
I suspect the SimpleMessageListenerContainer it is the one from the Spring configuration, in addition to the one I created with the custom Bean.
@Bean
public SimpleMessageListenerContainer messageListenerContainer(
ConnectionFactory connectionFactory,
@Qualifier("taskExecutor") TaskExecutor taskExecutor,
MessageListenerAdapter listenerAdapter
) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("app.third-request.app-third");
container.setTaskExecutor(taskExecutor);
container.setConcurrentConsumers(5);
container.setMaxConcurrentConsumers(50);
container.setPrefetchCount(10);
return container;
}
However this will not work with my Function<Request, Response> for the Message Queue. I would need to create a MessageListenerAdapter to read the messages.
Log Output:
[ main] o.s.a.r.l.SimpleMessageListenerContainer : Changing consumers from 1 to 1
[ main] o.s.a.r.l.SimpleMessageListenerContainer : No global properties bean
[ main] o.s.a.r.l.SimpleMessageListenerContainer : Starting Rabbit listener container.
[ app-third-1] o.s.a.r.listener.BlockingQueueConsumer : Starting consumer Consumer@13ddaffb: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
[ app-third-1] o.s.a.r.listener.BlockingQueueConsumer : Started on queue 'app.third-request.app-third' with tag amq.ctag-zP9jnSC-NwTYivlWOLf2kw:
Consumer@13ddaffb: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://rabbitmq-instance@10.244.52.31:5672/rabbitmq-instance,1),
conn: Proxy@51172948 Shared Rabbit Connection: SimpleConnection@fe13916 [delegate=amqp://rabbitmq-instance@10.244.52.31:5672/rabbitmq-instance, localPort=55370], acknowledgeMode=AUTO local queue size=0
[ main] o.s.a.r.l.SimpleMessageListenerContainer : Starting Rabbit listener container.
[ taskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Starting consumer Consumer@1ee22768: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
[ taskExecutor-5] o.s.a.r.listener.BlockingQueueConsumer : Starting consumer Consumer@38a52072: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
[ taskExecutor-3] o.s.a.r.listener.BlockingQueueConsumer : Starting consumer Consumer@746f8520: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
[ taskExecutor-2] o.s.a.r.listener.BlockingQueueConsumer : Starting consumer Consumer@6108fd23: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
[ taskExecutor-4] o.s.a.r.listener.BlockingQueueConsumer : Starting consumer Consumer@3d3a28b5: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
[ taskExecutor-5] o.s.a.r.c.CachingConnectionFactory : Creating cached Rabbit Channel from AMQChannel(amqp://rabbitmq-instance@10.244.52.31:5672/rabbitmq-instance,3)
[ taskExecutor-3] o.s.a.r.c.CachingConnectionFactory : Creating cached Rabbit Channel from AMQChannel(amqp://rabbitmq-instance@10.244.52.31:5672/rabbitmq-instance,4)
[ taskExecutor-1] o.s.a.r.c.CachingConnectionFactory : Creating cached Rabbit Channel from AMQChannel(amqp://rabbitmq-instance@10.244.52.31:5672/rabbitmq-instance,2)
[ taskExecutor-4] o.s.a.r.c.CachingConnectionFactory : Creating cached Rabbit Channel from AMQChannel(amqp://rabbitmq-instance@10.244.52.31:5672/rabbitmq-instance,6)
[ taskExecutor-2] o.s.a.r.c.CachingConnectionFactory : Creating cached Rabbit Channel from AMQChannel(amqp://rabbitmq-instance@10.244.52.31:5672/rabbitmq-instance,5)
[ taskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Started on queue 'app.third-request.app-third' with tag amq.ctag-HrAsdYteFh7S8s2KPZsrDg:
Consumer@1ee22768: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://rabbitmq-instance@10.244.52.31:5672/rabbitmq-instance,2),
conn: Proxy@51172948 Shared Rabbit Connection: SimpleConnection@fe13916 [delegate=amqp://rabbitmq-instance@10.244.52.31:5672/rabbitmq-instance, localPort=55370], acknowledgeMode=AUTO local queue size=0
4 More Started on queue
How can I get the SimpleMessageListenerContainer from my Spring Configuration to work as configured with concurrency and prefetch count?
It does not seem like Spring and/or RabbitMQ is using my listener configuration.
I have created a sample application that demonstrates this problem.
https://github.com/DJViking/spring-boot-rabbitmq-demo
Running this Spring Boot example will fire up a RabbitMQ instance and configure a queue similar to my application.
Upvotes: 0
Views: 112
Reputation: 872
When using Spring Cloud Stream with Functional programming model then I had to use spring.cloud.stream.binders
configuration properties, and not spring.rabbitmq.listener
properties
spring:
cloud:
function:
definition: processPayment
stream:
bindings:
processPayment-in-0:
destination: demo.payment-request
group: demo-payment
processPayment-out-0:
destination: demo.payment-response
group: demo-payment
rabbit:
bindings:
processPayment-in-0:
consumer:
prefetch: 10
single-active-consumer: false
max-concurrency: 25
container-type: simple
Upvotes: 0