Matija Župančić
Matija Župančić

Reputation: 1140

Spring RabbitMQ - caching multiple channels per connection?

the default caching strategy using Spring AMQP is to create a single connection with multiple channels. The number of created channels depends on the concurrency of the listeners.

This creates an issue for us because we have lots of listeners with a concurrency of only a few consumers. Rabbit documentation recommends using a single-digit number of channels per connection.

When we have a few listeners, let's say 10, and each of them needs a concurrency of 5. This creates a single connection with 50 channels which is far more than recommended.

I have tried playing with spring.rabbitmq.cache properties but I cannot figure out how to set it up so each listener uses its own connection with 5 channels. I would prefer to create 10 connections with 5 channels over 1 with 50.

Can someone point me in the right direction?

Upvotes: 2

Views: 1504

Answers (1)

Gary Russell
Gary Russell

Reputation: 174554

The simplest solution is to give each container its own connection factory.

You can add a ContainerCustomizer to the container factory and use it to change the connection factory for each container. It will be called after the container is created and before it is started.

EDIT

@SpringBootApplication
public class So64913992Application {

    public static void main(String[] args) {
        SpringApplication.run(So64913992Application.class, args);
    }

    @RabbitListener(queues = "foo", concurrency = "5")
    public void listen1(String in) {
        System.out.println(in);
    }

    @RabbitListener(queues = "bar", concurrency = "5")
    public void listen2(String in) {
        System.out.println(in);
    }

}

@Component
class Customizer {

    Customizer(GenericApplicationContext context, CachingConnectionFactory cf,
            SimpleRabbitListenerContainerFactory factory) {

        factory.setContainerCustomizer(container -> {
            CachingConnectionFactory newCf = new CachingConnectionFactory(cf.getRabbitConnectionFactory());
            String name = "cf." + container.getQueueNames()[0];
            newCf.setConnectionNameStrategy(c -> name);
            // set any other CCF properties you need
            context.registerBean(name, CachingConnectionFactory.class, () -> newCf);
            context.getBean(name); // initialize
            container.setConnectionFactory(newCf);
        });
    }

}

Upvotes: 1

Related Questions