DJViking
DJViking

Reputation: 872

Spring Boot RabbitMQ configured concurrency not working

I am trying to make consuming messages from a RabbitMQ Queue more efficient.

I am unable to get my Spring configuration to work with RabbitMQ SimpleMessageListenerContainer where I have configured concurrency and prefetch count.

Spring Boot 3.2.5
Spring Cloud 2023.0.1
spring-cloud-stream
spring-cloud-stream-binder-rabbit
Java 21
RabbitMQ Server running version 3.6.12

spring:
  application:
    name: app-third
  rabbitmq:
    host: ${RABBITMQ_URL:localhost}
    port: ${RABBITMQ_PORT:5672}
    virtual-host: ${RABBITMQ_VIRTUALHOST:}
    username: ${RABBITMQ_USER:guest}
    password: ${RABBITMQ_PASS:guest}
    listener:
      simple:
        retry:
          enabled: true
          max-attempts: 3
        concurrency: 5
        max-concurrency: 25
        prefetch: 10
  cloud:
    function:
      definition: process
    stream:
      bindings:
        process-in-0:
          destination: app.third-request
          group: app-third
        process-out-0:
          destination: app.third-response
          group: app-third
  threads:
    virtual:
      enabled: true

My Custom TaskExecutor Bean

@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(20);
    executor.setMaxPoolSize(200);
    executor.setQueueCapacity(400);
    executor.initialize();
    return executor;
}

I am able to get this to work only if I create a Custom Bean of SimpleMessageListenerContainer.
The RabbitMQ server now gets 6 consumers (should be 5).
Five of the consumers have prefetch count 10, while One has prefetch count 1.
I suspect the SimpleMessageListenerContainer it is the one from the Spring configuration, in addition to the one I created with the custom Bean.

@Bean
public SimpleMessageListenerContainer messageListenerContainer(
    ConnectionFactory connectionFactory,
    @Qualifier("taskExecutor") TaskExecutor taskExecutor,
    MessageListenerAdapter listenerAdapter
) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("app.third-request.app-third");
    container.setTaskExecutor(taskExecutor);
    container.setConcurrentConsumers(5);
    container.setMaxConcurrentConsumers(50);
    container.setPrefetchCount(10);
    return container;
}

However this will not work with my Function<Request, Response> for the Message Queue. I would need to create a MessageListenerAdapter to read the messages.

Log Output:

[           main] o.s.a.r.l.SimpleMessageListenerContainer : Changing consumers from 1 to 1
[           main] o.s.a.r.l.SimpleMessageListenerContainer : No global properties bean
[           main] o.s.a.r.l.SimpleMessageListenerContainer : Starting Rabbit listener container.
[    app-third-1] o.s.a.r.listener.BlockingQueueConsumer   : Starting consumer Consumer@13ddaffb: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
[    app-third-1] o.s.a.r.listener.BlockingQueueConsumer   : Started on queue 'app.third-request.app-third' with tag amq.ctag-zP9jnSC-NwTYivlWOLf2kw: 
  Consumer@13ddaffb: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://rabbitmq-instance@10.244.52.31:5672/rabbitmq-instance,1), 
  conn: Proxy@51172948 Shared Rabbit Connection: SimpleConnection@fe13916 [delegate=amqp://rabbitmq-instance@10.244.52.31:5672/rabbitmq-instance, localPort=55370], acknowledgeMode=AUTO local queue size=0

[           main] o.s.a.r.l.SimpleMessageListenerContainer : Starting Rabbit listener container.
[ taskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer   : Starting consumer Consumer@1ee22768: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
[ taskExecutor-5] o.s.a.r.listener.BlockingQueueConsumer   : Starting consumer Consumer@38a52072: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
[ taskExecutor-3] o.s.a.r.listener.BlockingQueueConsumer   : Starting consumer Consumer@746f8520: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
[ taskExecutor-2] o.s.a.r.listener.BlockingQueueConsumer   : Starting consumer Consumer@6108fd23: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
[ taskExecutor-4] o.s.a.r.listener.BlockingQueueConsumer   : Starting consumer Consumer@3d3a28b5: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
[ taskExecutor-5] o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://rabbitmq-instance@10.244.52.31:5672/rabbitmq-instance,3)
[ taskExecutor-3] o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://rabbitmq-instance@10.244.52.31:5672/rabbitmq-instance,4)
[ taskExecutor-1] o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://rabbitmq-instance@10.244.52.31:5672/rabbitmq-instance,2)
[ taskExecutor-4] o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://rabbitmq-instance@10.244.52.31:5672/rabbitmq-instance,6)
[ taskExecutor-2] o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://rabbitmq-instance@10.244.52.31:5672/rabbitmq-instance,5)
[ taskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer   : Started on queue 'app.third-request.app-third' with tag amq.ctag-HrAsdYteFh7S8s2KPZsrDg: 
    Consumer@1ee22768: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://rabbitmq-instance@10.244.52.31:5672/rabbitmq-instance,2), 
    conn: Proxy@51172948 Shared Rabbit Connection: SimpleConnection@fe13916 [delegate=amqp://rabbitmq-instance@10.244.52.31:5672/rabbitmq-instance, localPort=55370], acknowledgeMode=AUTO local queue size=0
4 More Started on queue

How can I get the SimpleMessageListenerContainer from my Spring Configuration to work as configured with concurrency and prefetch count?

It does not seem like Spring and/or RabbitMQ is using my listener configuration.

I have created a sample application that demonstrates this problem.
https://github.com/DJViking/spring-boot-rabbitmq-demo Running this Spring Boot example will fire up a RabbitMQ instance and configure a queue similar to my application.

Upvotes: 0

Views: 112

Answers (1)

DJViking
DJViking

Reputation: 872

When using Spring Cloud Stream with Functional programming model then I had to use spring.cloud.stream.binders configuration properties, and not spring.rabbitmq.listener properties

spring:
  cloud:
    function:
      definition: processPayment
    stream:
      bindings:
        processPayment-in-0:
          destination: demo.payment-request
          group: demo-payment
        processPayment-out-0:
          destination: demo.payment-response
          group: demo-payment
      rabbit:
        bindings:
          processPayment-in-0:
            consumer:
              prefetch: 10
              single-active-consumer: false
              max-concurrency: 25
              container-type: simple

Upvotes: 0

Related Questions