Prabhu R
Prabhu R

Reputation: 14232

Elegant way to add multiple @RabbitListener beans to a ContainerFactory

Here is my @Configuration

   @Bean
    public AmqpAdmin amqpAdmin()
    {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());

        DirectExchange dirExchange = new DirectExchange("evtExchange", true,
                false);

        rabbitAdmin.declareExchange(dirExchange);
        rabbitAdmin.declareQueue(processQueue);
        Binding processBinding = BindingBuilder.bind(processQueue)
                .to(dirExchange).with("rkey.process");
        rabbitAdmin.declareBinding(processBinding);

        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate()
    {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        return rabbitTemplate;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory()
    {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        SimpleMessageListenerContainer container = factory
                .createListenerContainer();
        factory.setConcurrentConsumers(50);
        factory.setMaxConcurrentConsumers(100);
        container.setStartConsumerMinInterval(3000);
        container.setQueues(processQueue);
        factory.setAdviceChain(retryInterceptor());
        return factory;
    }

    @Bean
    public RetryOperationsInterceptor retryInterceptor()
    {
        return RetryInterceptorBuilder.stateless().maxAttempts(5)
                .backOffOptions(1000, 2.0, 10000).recoverer(new RejectAndDontRequeueRecoverer()).build();
    }

    @Bean
    public ProcessQueueListener processListener()
    {
        return new ProcessQueueListener();
    }

    @Bean
    public ProcessQueueListener processListener2()
    {
        return new ProcessQueueListener();
    }

    @Bean
    public ProcessQueueListener processListener3()
    {
        return new ProcessQueueListener();
    }

And here is the @RabbitListener class

@RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues = "process")
public class ProcessQueueListener
{

    public ProcessQueueListener()
    {
    }

    @RabbitHandler
    void receiveMessage(String message)
    {
        // doSomething
    }

} 

Only when I instantiate processListener(), processListener2() and processListener3() separately, I start to see multiple consumers in the RabbitMQ Admin for the process Queue, and each listener processing the messages, otherwise I just see only one consumer in spite of specifying setConcurrentConsumers()

Is there an elegant way to declare multiple listeners on demand, increase and decrease based on need. Or declaring multiple @Beans the only option? Or am I doing something wrong?

Upvotes: 0

Views: 5234

Answers (1)

Gary Russell
Gary Russell

Reputation: 174729

What version are you using?

I just copied your container factory and it works fine for me (2.1.3)...

enter image description here

BTW, starting with version 2.0, you can add concurrency to the @RabbitListener and it will override any value in the container factory.

/**
 * Set the concurrency of the listener container for this listener. Overrides the
 * default set by the listener container factory. Maps to the concurrency setting of
 * the container type.
 * <p>For a
 * {@link org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
 * SimpleMessageListenerContainer} if this value is a simple integer, it sets a fixed
 * number of consumers in the {@code concurrentConsumers} property. If it is a string
 * with the form {@code "m-n"}, the {@code concurrentConsumers} is set to {@code m}
 * and the {@code maxConcurrentConsumers} is set to {@code n}.
 * <p>For a
 * {@link org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer
 * DirectMessageListenerContainer} it sets the {@code consumersPerQueue} property.
 * @return the concurrency.
 * @since 2.0
 */
String concurrency() default "";

Also, unrelated, but you should not do this rabbitAdmin.declareExchange(dirExchange) in a bean declaration - it is too early in the application context lifecycle to connect to RabbitMQ. Add the exchange, queue and binding as @Beans and the admin will find and declare them automatically.

Upvotes: 1

Related Questions