Reputation: 5420
I know it is possible to make SimpleMessageListenerContainer
bean and set prefetch count and message listener here, like this:
@Bean
public SimpleMessageListenerContainer messageListenerContainer(
ConnectionFactory rabbitConnectionFactory,
Receiver receiver) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueueNames("hello");
container.setMessageListener(new MessageListenerAdapter(receiver, "receive"));
container.setPrefetchCount(1000);
return container;
}
But how to set prefetch count for channel if I want to use declarative approach using @RabbitListener
?
@Component
public class Receiver {
private static final Logger log = LoggerFactory.getLogger(Receiver.class);
@RabbitListener(queues = "hello") // how to set prefetch count here?
public void receive(String message) {
log.info(" [x] Received '{}'.", message);
}
}
It is not possible?
Upvotes: 14
Views: 18445
Reputation: 34695
If you want to use different prefetch for each queue you can create listener factory with default spring setting and override just prefetch.
@Bean
public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setPrefetchCount(prefetch);
return factory;
}
Upvotes: 1
Reputation: 91
I'm using Spring boot 2.3.3 and I changed the following in application.properties and it worked.
spring.rabbitmq.listener.direct.prefetch=1000
spring.rabbitmq.listener.simple.prefetch=1000
I don't know the difference between direct and simple so I set both.
Upvotes: 4
Reputation: 480
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
int concurrentConsumers = Integer.parseInt(PropertiesLoader.getProperty("queue.concurrent-consumers"));
int maxConcurrentConsumers = Integer.parseInt(PropertiesLoader.getProperty("queue.max-concurrent-consumers"));
int consecutiveActiveTrigger = Integer.parseInt(PropertiesLoader.getProperty("queue.consecutive-active-trigger"));
int prefectCount = Integer.parseInt(PropertiesLoader.getProperty("queue.prefetch_count"));
factory.setPrefetchCount(prefectCount);
factory.setConcurrentConsumers(concurrentConsumers);
factory.setMaxConcurrentConsumers(maxConcurrentConsumers);
factory.setConsecutiveActiveTrigger(consecutiveActiveTrigger);
return factory;
}
Upvotes: 0
Reputation: 5420
Solution according to @artem-bilan answer:
Declare RabbitListenerContainerFactory
bean with prefetch count 10 in some @Configuration
class:
@Bean
public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchTenRabbitListenerContainerFactory(ConnectionFactory rabbitConnectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory);
factory.setPrefetchCount(10);
return factory;
}
Receiver
bean uses this factory bean:
@Component
public class Receiver {
private static final Logger log = LoggerFactory.getLogger(Receiver.class);
@RabbitListener(queues = "hello", containerFactory = "prefetchTenRabbitListenerContainerFactory")
public void receive(String message) {
log.info(" [x] Received '{}'.", message);
}
@RabbitListener(queues = "hello")
public void receiveWithoutPrefetch(String message) {
log.info(" [x] Received without prefetch '{}'.", message);
}
}
Two listeners here is just for demo purpose.
With this configuration Spring creates two AMQP channels. One for each @RabbitListener
. First with prefetch count 10 using our new prefetchTenRabbitListenerContainerFactory
bean and second with prefetch count 1 using default rabbitListenerContainerFactory
bean.
Upvotes: 22
Reputation: 121282
The @RabbitListener
has containerFactory
option:
/**
* The bean name of the {@link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory}
* to use to create the message listener container responsible to serve this endpoint.
* <p>If not specified, the default container factory is used, if any.
* @return the {@link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory}
* bean name.
*/
String containerFactory() default "";
Where you can configure SimpleRabbitListenerContainerFactory
with the desired prefetchCount
and the target SimpleMessageListenerContainer
for that annotation will have that option for you.
Upvotes: 5