JJ Zabkar
JJ Zabkar

Reputation: 3689

Spring AMQP Set prefetchCount on Queue Bean

How do I set the prefetchCount on for this queue consumer?

@Bean
public Queue eventQueue(AmqpAdmin amqpAdmin) {
    Queue queue = QueueBuilder.durable(EVENT_QUEUE_NAME)
            ...
            .build();

    TopicExchange topicExchange = new TopicExchange(TOPIC_EXCHANGE, true, false);

    amqpAdmin.declareBinding(BindingBuilder
            .bind(queue)
            .to(topicExchange)
            .with(EVENT_ROUTING_KEY));

    return queue;
}

The documentation notes that prefetchCount this is a container configuration, but setting it on my factory bean did not work and the value defaulted to 250:

    @Bean
    public SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPrefetchCount(10); // doesn't work; defaults to 250
        return factory;
    }

UPDATE

Per @GaryRussell's comment below, I did test out the default rabbitListenerContainerFactory and also validated my Spring Boot configuration spring.rabbitmq.listener.simple.prefetch was being consumed in AbstractRabbitListenerContainerFactoryConfigurer. However, when I look at the queue consumers in RabbitMQ, I can see that the queues that I define using the default container setup still have a prefetchCount of 250:

rabbit admin shows consumer prefetch 250

I'm using the RabbitMQ admin panel as the source of truth. I don't think it's lying, because I have a bunch of dynamic queues that are instantiated with custom containers, and those do have non-default (correct) prefetchCounts. I also validated in the Spring container startup that there was only the one (expected) rabbitListenerContainerFactory bean.

Upvotes: 2

Views: 2256

Answers (1)

Gary Russell
Gary Russell

Reputation: 174574

Prefetch is not a queue property, it's a consumer property.

What does your listener look like?

You are using a non-standard name for the container factory.

You either need to add containerFactory property to the @RabbitListener or you need to rename your bean to rabbitListenerContainerFactory (overriding Boot's defined factory @Bean).

Also

amqpAdmin.declareBinding(BindingBuilder

You should not be talking to the broker in a bean definition - it's too early.

Simply add your queue, exchange, and binding as @Beans and You can also just set the prefetch in the application.properties/yaml file (if you are using Spring Boot). The admin will find them and declare them when the connection is first opened.

EDIT

There's someething else going on...

@SpringBootApplication
public class So62049769Application {

    public static void main(String[] args) {
        SpringApplication.run(So62049769Application.class, args);
    }

    @Bean
    public Queue queue() {
        return new Queue("so62049769");
    }

    @RabbitListener(queues = "so62049769")
    public void listen(String in) {
        System.out.println(in);
    }

}
spring.rabbitmq.listener.simple.prefetch=42

enter image description here

Upvotes: 2

Related Questions