Reputation: 14232
Here is my @Configuration
@Bean
public AmqpAdmin amqpAdmin()
{
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
DirectExchange dirExchange = new DirectExchange("evtExchange", true,
false);
rabbitAdmin.declareExchange(dirExchange);
rabbitAdmin.declareQueue(processQueue);
Binding processBinding = BindingBuilder.bind(processQueue)
.to(dirExchange).with("rkey.process");
rabbitAdmin.declareBinding(processBinding);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate()
{
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
return rabbitTemplate;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory()
{
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
SimpleMessageListenerContainer container = factory
.createListenerContainer();
factory.setConcurrentConsumers(50);
factory.setMaxConcurrentConsumers(100);
container.setStartConsumerMinInterval(3000);
container.setQueues(processQueue);
factory.setAdviceChain(retryInterceptor());
return factory;
}
@Bean
public RetryOperationsInterceptor retryInterceptor()
{
return RetryInterceptorBuilder.stateless().maxAttempts(5)
.backOffOptions(1000, 2.0, 10000).recoverer(new RejectAndDontRequeueRecoverer()).build();
}
@Bean
public ProcessQueueListener processListener()
{
return new ProcessQueueListener();
}
@Bean
public ProcessQueueListener processListener2()
{
return new ProcessQueueListener();
}
@Bean
public ProcessQueueListener processListener3()
{
return new ProcessQueueListener();
}
And here is the @RabbitListener
class
@RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues = "process")
public class ProcessQueueListener
{
public ProcessQueueListener()
{
}
@RabbitHandler
void receiveMessage(String message)
{
// doSomething
}
}
Only when I instantiate processListener()
, processListener2()
and processListener3()
separately, I start to see multiple consumers in the RabbitMQ Admin for the process Queue, and each listener processing the messages, otherwise I just see only one consumer in spite of specifying setConcurrentConsumers()
Is there an elegant way to declare multiple listeners on demand, increase and decrease based on need. Or declaring multiple @Bean
s the only option? Or am I doing something wrong?
Upvotes: 0
Views: 5234
Reputation: 174729
What version are you using?
I just copied your container factory and it works fine for me (2.1.3)...
BTW, starting with version 2.0, you can add concurrency
to the @RabbitListener
and it will override any value in the container factory.
/**
* Set the concurrency of the listener container for this listener. Overrides the
* default set by the listener container factory. Maps to the concurrency setting of
* the container type.
* <p>For a
* {@link org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
* SimpleMessageListenerContainer} if this value is a simple integer, it sets a fixed
* number of consumers in the {@code concurrentConsumers} property. If it is a string
* with the form {@code "m-n"}, the {@code concurrentConsumers} is set to {@code m}
* and the {@code maxConcurrentConsumers} is set to {@code n}.
* <p>For a
* {@link org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer
* DirectMessageListenerContainer} it sets the {@code consumersPerQueue} property.
* @return the concurrency.
* @since 2.0
*/
String concurrency() default "";
Also, unrelated, but you should not do this rabbitAdmin.declareExchange(dirExchange)
in a bean declaration - it is too early in the application context lifecycle to connect to RabbitMQ. Add the exchange, queue and binding as @Bean
s and the admin will find and declare them automatically.
Upvotes: 1