DocJones
DocJones

Reputation: 677

Springboot RabbitMQ with concurrent consumers

i have a simple architecture, in which one of three processes (actually a spring-boot dockerized container) produces thousands of messages to a exchange/queue, and ALL three processes consume those messages.

I want the consumerside to be multi-threaded to achieve maximum throughput, but with my current configuration, i dont achieve what i want.

Here is the setup:

Listener:

  @RabbitListener(
    bindings = @QueueBinding(
      value = @Queue(
                value = RabbitMQConfiguration.BORD_QUEUE, 
                durable = "true", 
                arguments = @Argument(name = "x-queue-mode", value = "lazy")),
      exchange = @Exchange(value = RabbitMQConfiguration.QUEUE_EXCHANGE),
      key = "B"), 
    concurrency = "8-8")

  public void listenBordero(Long bId, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
    log.info("Received message: {{}}", bId);
    processor.process(bId);
  }

Here is the - possibly relevant - springboot application.properties configuration:

spring.rabbitmq.listener.type=simple
spring.rabbitmq.listener.simple.prefetch=1
spring.rabbitmq.listener.simple.max-concurrency=8
spring.rabbitmq.listener.simple.concurrency=8

Although i believe that not all of them are neccessary due to @RabbitListener annotation based configuration.

In the rabbitmq-managment i see enter image description here with state is mainly idle.

From my springboot-logs i see the threading is scarcely takeing place (the thread-name in the log only changes after doezens of lines, whereas i expect that to switch much more often).

The queue shows in the management-console:

enter image description here

My Expectation would be to see the Stats of the consumer on the channel-page to be much more in "running".

Can anyone please enlighten me?

Thanks in advance!

EDIT: After activating debugging for spring-amqp i see multiple messages are being received, but afterwards the threads named "..Container#0-n" are only processed en bloc. I would have expected the output of the multiple running container-threads to be mixed up. Can this be a ThreadPool Issue?

Here is a (noise reduced) log excerpt:

2020-03-05 08:15:21.418  INFO 32960 --- [ntContainer#0-7] d.i.clearing.rabbit.RabbitMQListener     : Received message: {436}
2020-03-05 08:15:21.418  INFO 32960 --- [ntContainer#0-5] d.i.clearing.rabbit.RabbitMQListener     : Received message: {450}
2020-03-05 08:15:21.418  INFO 32960 --- [tContainer#0-12] d.i.clearing.rabbit.RabbitMQListener     : Received message: {414}
2020-03-05 08:15:21.418  INFO 32960 --- [tContainer#0-11] d.i.clearing.rabbit.RabbitMQListener     : Received message: {418}
2020-03-05 08:15:21.418  INFO 32960 --- [ntContainer#0-8] d.i.clearing.rabbit.RabbitMQListener     : Received message: {432}
2020-03-05 08:15:21.418  INFO 32960 --- [ntContainer#0-2] d.i.clearing.rabbit.RabbitMQListener     : Received message: {456}
2020-03-05 08:15:21.418  INFO 32960 --- [tContainer#0-15] d.i.clearing.rabbit.RabbitMQListener     : Received message: {469}
2020-03-05 08:15:21.419  INFO 32960 --- [tContainer#0-16] d.i.clearing.rabbit.RabbitMQListener     : Received message: {394}
2020-03-05 08:15:21.418  INFO 32960 --- [tContainer#0-13] d.i.clearing.rabbit.RabbitMQListener     : Received message: {409}
2020-03-05 08:15:21.419  INFO 32960 --- [ntContainer#0-4] d.i.clearing.rabbit.RabbitMQListener     : Received message: {452}
2020-03-05 08:15:21.418  INFO 32960 --- [ntContainer#0-9] d.i.clearing.rabbit.RabbitMQListener     : Received message: {431}
2020-03-05 08:15:21.419  INFO 32960 --- [tContainer#0-10] d.i.clearing.rabbit.RabbitMQListener     : Received message: {420}
2020-03-05 08:15:21.419  INFO 32960 --- [tContainer#0-14] d.i.clearing.rabbit.RabbitMQListener     : Received message: {350}
2020-03-05 08:15:21.419  INFO 32960 --- [ntContainer#0-1] d.i.clearing.rabbit.RabbitMQListener     : Received message: {400}
2020-03-05 08:15:21.419  INFO 32960 --- [ntContainer#0-3] d.i.clearing.rabbit.RabbitMQListener     : Received message: {453}
2020-03-05 08:15:21.419  INFO 32960 --- [ntContainer#0-6] d.i.clearing.rabbit.RabbitMQListener     : Received message: {449}
... Logging Noise removed
2020-03-05 08:15:26.409  INFO 32960 --- [ntContainer#0-3] de.idslogistik.clearing.api.p2.Clearing  : Calculating {7183042} - {8330}/{5500}  
2020-03-05 08:15:26.411  INFO 32960 --- [ntContainer#0-3] de.idslogistik.clearing.api.p2.Clearing  : ... Logging Noise removed
2020-03-05 08:15:26.412 DEBUG 32960 --- [ntContainer#0-3] de.idslogistik.clearing.api.p2.Clearing  : ... Logging Noise removed
2020-03-05 08:15:26.413  INFO 32960 --- [ntContainer#0-3] de.idslogistik.clearing.api.p2.Clearing  : ... Logging Noise removed
2020-03-05 08:15:26.507  INFO 32960 --- [ntContainer#0-3] de.idslogistik.clearing.api.p2.Clearing  : ... Logging Noise removed
2020-03-05 08:15:26.669  WARN 32960 --- [ntContainer#0-3] de.idslogistik.clearing.api.p2.Rddel     : ... Logging Noise removed
2020-03-05 08:15:26.669  INFO 32960 --- [ntContainer#0-3] de.idslogistik.clearing.api.p2.Rddel     : ... Logging Noise removed
2020-03-05 08:15:33.012 DEBUG 32960 --- [ntContainer#0-3] de.idslogistik.clearing.api.p2.Rddel     : ... Logging Noise removed
2020-03-05 08:15:33.013 DEBUG 32960 --- [ntContainer#0-3] d.i.i.util.clearing.VerSendungTools      : ... Logging Noise removed
2020-03-05 08:15:33.013 DEBUG 32960 --- [ntContainer#0-3] d.i.i.util.clearing.VerSendungTools      : ... Logging Noise removed
2020-03-05 08:15:33.013 DEBUG 32960 --- [ntContainer#0-3] d.i.i.util.clearing.VerSendungTools      : ... Logging Noise removed
2020-03-05 08:15:33.014 DEBUG 32960 --- [ntContainer#0-3] d.i.i.util.clearing.VerSendungTools      : ... Logging Noise removed

Upvotes: 5

Views: 11817

Answers (1)

Ming
Ming

Reputation: 71

prefetch = 1 means that the maximum number of unacknowledged messages in the current queue is at most 1.When this message is not acknowledged, the next message cannot enter the consumption queue.

Or you can consider increasing the number of consumers to speed up the processing of messages https://www.rabbitmq.com/tutorials/tutorial-two-java.html

Upvotes: 1

Related Questions