VelNaga
VelNaga

Reputation: 3953

Handling Connections in Spring-Boot-RabbitMQ

Hi I am developing Spring-boot-RabbitMQ version 1.6.I am having few queries while developing the application. Read the docs and browsed other stack overflow question but i cannot get few things clear(Might be because of my bad memory). It would be great if some one answers my questions.

1) Currently i am having 4-Producers and 4-Consumers.Producer may produce millions of messages or events so using a single connection for both producer & consumer will block consumer to consume the messages.So what i would thought is creating separate connections for producer and consumer so that both will not block and will give some performance improvement.Am i correct with this approach?

2) I am using CachingConnectionFactory in order to create connection using SimpleRabbitListenerContainerFactory.While making call to this factory whether it will return new connection for us?So if we use CachingConnectionFactory do we really need to write a separate connection factories for both Producer & consumer.Please find my below

1)Configuration class

@Configuration
@EnableRabbit
public class RabbitMqConfiguration{

@Autowired
private CachingConnectionFactory cachingConnectionFactory;

@Value("${concurrent.consumers}")
public int concurrent_consumers;

@Value("${max.concurrent.consumers}")
public int max_concurrent_consumers;

 @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(cachingConnectionFactory);
        factory.setConcurrentConsumers(concurrent_consumers);
        factory.setMaxConcurrentConsumers(max_concurrent_consumers);
        factory.setMessageConverter(jsonMessageConverter());
        return factory;
    }

@Bean
public MessageConverter jsonMessageConverter()
{
    final Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
    return converter;
}

}

2)Producer Class

@Configuration
public class TaskProducerConfiguration extends RabbitMqConfiguration {

@Value("${queue1}")
public String queue1;

@Value("${queue2}")
public String queue2;

@Value("${queue3}")
public String queue1;

@Value("${queue4}")
public String queue2;

@Value("${spring.rabbit.exchange}")
public String exchange;

@Autowired
private CachingConnectionFactory cachingConnectionFactory;

@Primary
@Bean
public RabbitTemplate getQueue1Template()
{
    RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
    template.setRoutingKey(this.queue1);
    template.setMessageConverter(jsonMessageConverter());
    return template;
}

@Bean
public RabbitTemplate getQueue2Template()
{
    RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
    template.setRoutingKey(this.queue2);
    template.setMessageConverter(jsonMessageConverter());
    return template;
}

@Bean
public RabbitTemplate getQueue3Template()
{
    RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
    template.setRoutingKey(this.queue3);
    template.setMessageConverter(jsonMessageConverter());
    return template;
}

@Bean
public RabbitTemplate getQueue4Template()
{
    RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
    template.setRoutingKey(this.queue4);
    template.setMessageConverter(jsonMessageConverter());
    return template;
}
@Bean(name="queue1Bean")
public Queue queue1()
{
    return new Queue(this.queue1);
}

@Bean(name="queue2Bean")
public Queue queue2()
{
    return new Queue(this.queue2);
}

@Bean(name="queue3Bean")
public Queue queue3()
{
    return new Queue(this.queue3);
}

@Bean(name="queue4Bean")
public Queue queue4()
{
    return new Queue(this.queue4);
}

@Bean
TopicExchange exchange() {
    return new TopicExchange(exchange);
}

@Bean
List<Binding> bindings(Queue queue1Bean,Queue queue2Bean,Queue queue3Bean,Queue queue4Bean, TopicExchange exchange) {
    List<Binding> bindingList = new ArrayList<Binding>();
    bindingList.add(BindingBuilder.bind(queue1Bean).to(exchange).with(this.queue1));
    bindingList.add(BindingBuilder.bind(queue2Bean).to(exchange).with(this.queue2));
    bindingList.add(BindingBuilder.bind(queue3Bean).to(exchange).with(this.queue3));
    bindingList.add(BindingBuilder.bind(queue4Bean).to(exchange).with(this.queue4));
    return bindingList;
}

}

3) Receiver Class(Just Shared one receiver class rest of the 3-receiver classes are one and the same except queue name & routing key).

@Component
public class Queue1Receiver {

@Autowired
private TaskProducer taskProducer;

@Value("${queue1}")
public String queue1;

@RabbitListener(id="queue1",containerFactory="rabbitListenerContainerFactory",queues = "#{queue1Bean}")
public void handleQueue1Message(TaskMessage taskMessage,@Header(AmqpHeaders.CONSUMER_QUEUE) String queue)
{
    System.out.println("Queue::"+queue);
    System.out.println("CustomerId: " + taskMessage.getCustomerID());
    if(taskMessage.isHasQueue2()){
        taskProducer.sendQueue2Message(taskMessage);
    }
    if(taskMessage.isHasQueue3()){
        taskProducer.sendQueue3Message(taskMessage);
    }
    if(taskMessage.isHasQueue4()){
        taskProducer.sendQueue4Message(taskMessage);
    }
}

@Bean
public Queue queue1Bean() {
    // This queue has the following properties:
    // name: my_durable,durable: true,exclusive: false,auto_delete: false
    return new Queue(queue1, true, false, false);
}

}

Your help should be appreciable.

Note : Down Voters please register your comment before down voting so that in future i can avoid the mistake.

Edited based on comments by Gary Russell: 1)RabbitMqConfiguration

@Configuration
@EnableRabbit
public class RabbitMqConfiguration{

@Value("${concurrent.consumers}")
public int concurrent_consumers;

@Value("${max.concurrent.consumers}")
public int max_concurrent_consumers;

 @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrentConsumers(concurrent_consumers);
        factory.setMaxConcurrentConsumers(max_concurrent_consumers);
        factory.setMessageConverter(jsonMessageConverter());
        return factory;
    }

@Bean
public CachingConnectionFactory connectionFactory()
{
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    connectionFactory.setCacheMode(CacheMode.CONNECTION);
    return connectionFactory;
}


@Bean
public MessageConverter jsonMessageConverter()
{
    final Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
    return converter;
}


}

Upvotes: 4

Views: 6714

Answers (2)

Mohd Arshil
Mohd Arshil

Reputation: 305

You can use connection pooling in the same case keeping the pool size appropriate may solve the problem.As suggested in the above answer both producer and consumer are using the same connection so pooling might help you out instead.

Upvotes: 0

Gary Russell
Gary Russell

Reputation: 174494

using a single connection for both producer & consumer will block consumer to consume the messages`

What leads you to believe that? A single connection will generally be fine. If you really want separate connections, change the connection factory cacheMode to CONNECTION.

Upvotes: 6

Related Questions