Spring Boot RabbitMQ Publisher and Receiver On the Same Project

I have an application (Microservice like) which should send and receives messages from other applications (Microservices). The application has several publishers with every publisher publishing to a specific queue as well as several subscriber classes with each subscriber subscribing to only one queue. Unfortunately, my subscriber classes are consuming the same messages I publish. How should I go about it?

Here is my code: a) Publisher 1 - does not have a listener method since it only publishes to my.queues.queue1

@Configuration
public class RabbitQueue1Publisher{
    private static final String QUEUE_NAME = "my.queues.queue1";

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("http://127.0.0.1:1675");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public Queue simpleQueue() {
        return new Queue(QUEUE_NAME);
    }

    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setRoutingKey(QUEUE_NAME);
        template.setMessageConverter(jsonMessageConverter());
        return template;
    }
}

b) Publisher 2 - also does not have a listener method since it only publishes to my.queues.queue2

@Configuration
public class RabbitQueue2Publisher{
private static final String QUEUE_NAME = "my.queues.queue2";

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("http://127.0.0.1:1675");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public Queue simpleQueue() {
        return new Queue(QUEUE_NAME);
    }

    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setRoutingKey(QUEUE_NAME);
        template.setMessageConverter(jsonMessageConverter());
        return template;
    }
}

c) Consumer 1 - consumes from queue3. Has a listener method

@Configuration
public class RabbitQueue3Subscriber{
    private static final String QUEUE_NAME = "my.queue.queue3";
    
    @Autowired
    private Queue3Listener Queue3Listener;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("http://127.0.0.1:15672");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public Queue simpleQueue() {
        return new Queue(QUEUE_NAME);
    }

    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setRoutingKey(QUEUE_NAME);
        template.setMessageConverter(jsonMessageConverter());
        return template;
    }

    @Bean
    public SimpleMessageListenerContainer userListenerContainer() {
        SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
        listenerContainer.setConnectionFactory(connectionFactory());
        listenerContainer.setQueues(simpleQueue());
        listenerContainer.setMessageConverter(jsonMessageConverter());
        listenerContainer.setMessageListener(Queue3Listener);
        listenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return listenerContainer;
    }
}

d) Consumer 2 - consumes from queue4. Has a listener method

@Configuration
public class RabbitQueue4Subscriber{
    private static final String QUEUE_NAME = "my.queue.queue4";
    
    @Autowired
    private Queue4Listener Queue4Listener;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("http://127.0.0.1:15672");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public Queue simpleQueue() {
        return new Queue(QUEUE_NAME);
    }

    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setRoutingKey(QUEUE_NAME);
        template.setMessageConverter(jsonMessageConverter());
        return template;
    }

    @Bean
    public SimpleMessageListenerContainer userListenerContainer() {
        SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
        listenerContainer.setConnectionFactory(connectionFactory());
        listenerContainer.setQueues(simpleQueue());
        listenerContainer.setMessageConverter(jsonMessageConverter());
        listenerContainer.setMessageListener(Queue4Listener);
        listenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return listenerContainer;
    }
}

Though I am publishing and consuming to/from different queues, I end up consuming the same messages I produce. Can someone point out what I am doing wrong or suggest the way to do it?

Upvotes: 0

Views: 1881

Answers (1)

Amit Phaltankar
Amit Phaltankar

Reputation: 3424

Here is how it works for me. I have publisher and a consumer of Rabbitmq. Doesn't mater if they are part of the same project or different.

Publisher:

Publisher Configuration

@Configuration
class PublisherConfig{
String queueName = "com.queueName";
    String routingKey = "com.routingKey";
    String exchange = "com.exchangeName";

    @Bean
    Queue queue() {
        return new Queue(queueName, false);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(exchange);
    }

    @Bean
    Binding binding(Queue queueFoo, TopicExchange exchange) {
        return BindingBuilder.bind(queueFoo).to(exchange).with(routingKey);
    }

    //Required only if you want to pass custom object as part of payload
    @Bean
    public MappingJackson2MessageConverter jackson2Converter() {
        return new MappingJackson2MessageConverter();
    }
}

Publish Message

    @Autowired private RabbitMessagingTemplate rabbitMessagingTemplate;
    @Autowired private MappingJackson2MessageConverter mappingJackson2MessageConverter;

    rabbitMessagingTemplate.setMessageConverter(this.mappingJackson2MessageConverter);
    rabbitMessagingTemplate.convertAndSend(exchange, routingKey, employObj)

Consumer

Consumer Configuration

@Configuration
public class RabbitMQConfiguration implements RabbitListenerConfigurer {
    public MappingJackson2MessageConverter jackson2Converter() {
        return new MappingJackson2MessageConverter();
    }

    @Bean
    public DefaultMessageHandlerMethodFactory handlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(jackson2Converter());
        return factory;
    }

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(handlerMethodFactory());
    }
}

Listen a Message

    @RabbitListener(queues = "com.queueName")
    public void receiveMessage(Employee employee) {
        // More code
    }

You can encapsulate Publisher and Listener configurations in two different @configuration files.

Hope this helps you


P.S. OP asked for explanation. Here it is:

Exchange and Routing Key

Publisher publishes a message to an exchange with a particular routing key. Routing key helps to differentiate the type of message it is. Suppose: Send all user logged in messages with routing key of 'user_logged_in'. Send all email sent messages with 'email_sent'.

Queue:

Once the routing key is attached with the exchange there comes a queue. Queue is attached a exchange and routing key and all the published messages will sit in this queue.

Now consumer explicitly, connects to such queues and listen messages.

So queue name in publisher config and consumer config has to be the same.

Once your publisher is up you can actually visit RabbitMq dashboard and see the exchange, routing key and queue to see how it works.

Upvotes: 0

Related Questions