Reputation: 375
I have an application (Microservice like) which should send and receives messages from other applications (Microservices). The application has several publishers with every publisher publishing to a specific queue as well as several subscriber classes with each subscriber subscribing to only one queue. Unfortunately, my subscriber classes are consuming the same messages I publish. How should I go about it?
Here is my code: a) Publisher 1 - does not have a listener method since it only publishes to my.queues.queue1
@Configuration
public class RabbitQueue1Publisher{
private static final String QUEUE_NAME = "my.queues.queue1";
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("http://127.0.0.1:1675");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public Queue simpleQueue() {
return new Queue(QUEUE_NAME);
}
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setRoutingKey(QUEUE_NAME);
template.setMessageConverter(jsonMessageConverter());
return template;
}
}
b) Publisher 2 - also does not have a listener method since it only publishes to my.queues.queue2
@Configuration
public class RabbitQueue2Publisher{
private static final String QUEUE_NAME = "my.queues.queue2";
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("http://127.0.0.1:1675");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public Queue simpleQueue() {
return new Queue(QUEUE_NAME);
}
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setRoutingKey(QUEUE_NAME);
template.setMessageConverter(jsonMessageConverter());
return template;
}
}
c) Consumer 1 - consumes from queue3. Has a listener method
@Configuration
public class RabbitQueue3Subscriber{
private static final String QUEUE_NAME = "my.queue.queue3";
@Autowired
private Queue3Listener Queue3Listener;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("http://127.0.0.1:15672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public Queue simpleQueue() {
return new Queue(QUEUE_NAME);
}
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setRoutingKey(QUEUE_NAME);
template.setMessageConverter(jsonMessageConverter());
return template;
}
@Bean
public SimpleMessageListenerContainer userListenerContainer() {
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
listenerContainer.setConnectionFactory(connectionFactory());
listenerContainer.setQueues(simpleQueue());
listenerContainer.setMessageConverter(jsonMessageConverter());
listenerContainer.setMessageListener(Queue3Listener);
listenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
return listenerContainer;
}
}
d) Consumer 2 - consumes from queue4. Has a listener method
@Configuration
public class RabbitQueue4Subscriber{
private static final String QUEUE_NAME = "my.queue.queue4";
@Autowired
private Queue4Listener Queue4Listener;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("http://127.0.0.1:15672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public Queue simpleQueue() {
return new Queue(QUEUE_NAME);
}
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setRoutingKey(QUEUE_NAME);
template.setMessageConverter(jsonMessageConverter());
return template;
}
@Bean
public SimpleMessageListenerContainer userListenerContainer() {
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
listenerContainer.setConnectionFactory(connectionFactory());
listenerContainer.setQueues(simpleQueue());
listenerContainer.setMessageConverter(jsonMessageConverter());
listenerContainer.setMessageListener(Queue4Listener);
listenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
return listenerContainer;
}
}
Though I am publishing and consuming to/from different queues, I end up consuming the same messages I produce. Can someone point out what I am doing wrong or suggest the way to do it?
Upvotes: 0
Views: 1881
Reputation: 3424
Here is how it works for me. I have publisher and a consumer of Rabbitmq. Doesn't mater if they are part of the same project or different.
Publisher:
Publisher Configuration
@Configuration
class PublisherConfig{
String queueName = "com.queueName";
String routingKey = "com.routingKey";
String exchange = "com.exchangeName";
@Bean
Queue queue() {
return new Queue(queueName, false);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(exchange);
}
@Bean
Binding binding(Queue queueFoo, TopicExchange exchange) {
return BindingBuilder.bind(queueFoo).to(exchange).with(routingKey);
}
//Required only if you want to pass custom object as part of payload
@Bean
public MappingJackson2MessageConverter jackson2Converter() {
return new MappingJackson2MessageConverter();
}
}
Publish Message
@Autowired private RabbitMessagingTemplate rabbitMessagingTemplate;
@Autowired private MappingJackson2MessageConverter mappingJackson2MessageConverter;
rabbitMessagingTemplate.setMessageConverter(this.mappingJackson2MessageConverter);
rabbitMessagingTemplate.convertAndSend(exchange, routingKey, employObj)
Consumer
Consumer Configuration
@Configuration
public class RabbitMQConfiguration implements RabbitListenerConfigurer {
public MappingJackson2MessageConverter jackson2Converter() {
return new MappingJackson2MessageConverter();
}
@Bean
public DefaultMessageHandlerMethodFactory handlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(jackson2Converter());
return factory;
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(handlerMethodFactory());
}
}
Listen a Message
@RabbitListener(queues = "com.queueName")
public void receiveMessage(Employee employee) {
// More code
}
You can encapsulate Publisher and Listener configurations in two different @configuration files.
Hope this helps you
P.S. OP asked for explanation. Here it is:
Exchange and Routing Key
Publisher publishes a message to an exchange
with a particular routing key
. Routing key helps to differentiate the type of message it is.
Suppose:
Send all user logged in messages with routing key of 'user_logged_in'.
Send all email sent messages with 'email_sent'.
Queue:
Once the routing key is attached with the exchange there comes a queue. Queue is attached a exchange and routing key and all the published messages will sit in this queue.
Now consumer explicitly, connects to such queues and listen messages.
So queue name in publisher config and consumer config has to be the same.
Once your publisher is up you can actually visit RabbitMq dashboard and see the exchange, routing key and queue to see how it works.
Upvotes: 0