Hadi Rifaii
Hadi Rifaii

Reputation: 41

rabbitmq multiple event types in one queue

I am new to RabbitMQ and currently endeavoring to construct a dummy application to simulate a real project I am engaged in. This project involves two microservices, each written in a different language, communicating via RabbitMQ. There are various types of events being transmitted and received by each service. The precise order of processing these messages is crucial, and I have only one consumer for each producer. Based on my understanding, I should be able to ensure that these messages are processed in the same order they were produced.

To simulate this application, I've built two separate Spring Boot projects that are connected to the same RabbitMQ server.

Here's the configuration in my producer app:

package com.jchaaban.rabbitmqproducer.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {

    @Value("${spring.rabbitmq.host}")
    String host;
    @Value("${spring.rabbitmq.username}")
    String username;
    @Value("${spring.rabbitmq.password}")
    String password;

    @Value("${spring.rabbitmq.eventsQueueName}")
    private String queueName;

    @Bean
    public Queue eventsQueue() {
        return new Queue(queueName, true);
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(host);
        cachingConnectionFactory.setUsername(username);
        cachingConnectionFactory.setPassword(password);
        return cachingConnectionFactory;
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }
}

This is how i am sending messages in the producer side:

package com.jchaaban.rabbitmqproducer.service;

import com.jchaaban.rabbitmqproducer.event.MessageCreatedEvent;
import com.jchaaban.rabbitmqproducer.event.MessageDeletedEvent;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class RabbitMqSender {
    private final RabbitTemplate rabbitTemplate;
    public RabbitMqSender(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @Value("${spring.rabbitmq.eventsQueueName}")
    private String queueName;

    public void sendMessageCreatedEvent(MessageCreatedEvent event) {
        rabbitTemplate.convertSendAndReceive(queueName, event, messagePostProcessor -> {
            messagePostProcessor.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            messagePostProcessor.getMessageProperties().setHeader("eventType", "MessageCreatedEvent");
            return messagePostProcessor;
        });

    }

    public void sendMessageDeletedEvent(MessageDeletedEvent event) {
        rabbitTemplate.convertAndSend(queueName, event, messagePostProcessor -> {
            messagePostProcessor.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            messagePostProcessor.getMessageProperties().setHeader("eventType", "MessageDeletedEvent");
            return messagePostProcessor;
        });
    }
}

This is the configuration on my consumer side:

package com.jchaaban.rabbitmqcosumer.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.eventsQueueName}")
    private String queueName;

    @Bean
    public Queue eventsQueue() {
        return new Queue(queueName, true);
    }

    @Bean
    ObjectMapper objectMapper(){
        return JsonMapper.builder().findAndAddModules().build();
    }
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(host);
        cachingConnectionFactory.setUsername(username);
        cachingConnectionFactory.setPassword(password);
        return cachingConnectionFactory;
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }
}

And this is how i am trying to consume the message sent from the producer on the consumer side:

package com.jchaaban.rabbitmqcosumer.service;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.jchaaban.rabbitmqcosumer.event.MessageCreatedEvent;
import com.jchaaban.rabbitmqcosumer.event.MessageDeletedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class RabbitMqReceiver {
    private static final Logger logger = LoggerFactory.getLogger(RabbitMqReceiver.class);

    @Autowired
    private ObjectMapper objectMapper;
    
    @RabbitListener(queues = "${spring.rabbitmq.eventsQueueName}")
    public void receiveMessage(Message message) {
        try {
            String eventType = message.getMessageProperties().getHeader("eventType");
            Object event = deserializeEvent(message.getBody(), eventType);
            handleEvent(event);
        } catch (Exception ignored) {}
    }

    private Object deserializeEvent(byte[] body, String eventType) throws Exception {
        if ("MessageCreatedEvent".equals(eventType)) {
            return objectMapper.readValue(body, MessageCreatedEvent.class);
        } else if ("MessageDeletedEvent".equals(eventType)) {
            return objectMapper.readValue(body, MessageDeletedEvent.class);
        } else {
            throw new IllegalArgumentException("Unknown event type");
        }
    }

    private void handleEvent(Object event) {
        if (event instanceof MessageCreatedEvent) {
            handleCreationEvent((MessageCreatedEvent) event);
        } else if (event instanceof MessageDeletedEvent) {
            handleDeletionEvent((MessageDeletedEvent) event);
        }
    }
    private void handleCreationEvent(MessageCreatedEvent event) {
        logger.info("Received Message Creation Event: {}", event);
    }
    
    private void handleDeletionEvent(MessageDeletedEvent event) {
        logger.info("Received Message Deletion Event: {}", event);
    }
}

What i am trying to do is to check the eventType header of the message to know the type of the event i received, then use an objectMapper to convert the body of the message to the Event type i need, But when i try this appreach i get the following error:

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Failed to convert message at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:158) ~[spring-rabbit-3.1.2.jar:3.1.2] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1682) ~[spring-rabbit-3.1.2.jar:3.1.2] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1604) ~[spring-rabbit-3.1.2.jar:3.1.2] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1592) ~[spring-rabbit-3.1.2.jar:3.1.2] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1583) ~[spring-rabbit-3.1.2.jar:3.1.2] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListenerAndHandleException(AbstractMessageListenerContainer.java:1528) ~[spring-rabbit-3.1.2.jar:3.1.2] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.lambda$executeListener$8(AbstractMessageListenerContainer.java:1506) ~[spring-rabbit-3.1.2.jar:3.1.2] at io.micrometer.observation.Observation.observe(Observation.java:499) ~[micrometer-observation-1.12.3.jar:1.12.3] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1506) ~[spring-rabbit-3.1.2.jar:3.1.2] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1084) ~[spring-rabbit-3.1.2.jar:3.1.2] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1020) ~[spring-rabbit-3.1.2.jar:3.1.2] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1422) ~[spring-rabbit-3.1.2.jar:3.1.2] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1324) ~[spring-rabbit-3.1.2.jar:3.1.2] at java.base/java.lang.Thread.run(Thread.java:1623) ~[na:na] Caused by: org.springframework.amqp.support.converter.MessageConversionException: failed to resolve class name. Class not found [com.jchaaban.rabbitmqproducer.event.MessageDeletedEvent] at org.springframework.amqp.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:189) ~[spring-amqp-3.1.2.jar:3.1.2] at org.springframework.amqp.support.converter.DefaultJackson2JavaTypeMapper.fromTypeHeader(DefaultJackson2JavaTypeMapper.java:146) ~[spring-amqp-3.1.2.jar:3.1.2] at org.springframework.amqp.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:122) ~[spring-amqp-3.1.2.jar:3.1.2] at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.convertContent(AbstractJackson2MessageConverter.java:394) ~[spring-amqp-3.1.2.jar:3.1.2] at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.doFromMessage(AbstractJackson2MessageConverter.java:364) ~[spring-amqp-3.1.2.jar:3.1.2] at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:321) ~[spring-amqp-3.1.2.jar:3.1.2] at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:304) ~[spring-amqp-3.1.2.jar:3.1.2] at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.extractMessage(AbstractAdaptableMessageListener.java:342) ~[spring-rabbit-3.1.2.jar:3.1.2] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter$MessagingMessageConverterAdapter.extractPayload(MessagingMessageListenerAdapter.java:385) ~[spring-rabbit-3.1.2.jar:3.1.2] at org.springframework.amqp.support.converter.MessagingMessageConverter.fromMessage(MessagingMessageConverter.java:132) ~[spring-amqp-3.1.2.jar:3.1.2] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:258) ~[spring-rabbit-3.1.2.jar:3.1.2] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:148) ~[spring-rabbit-3.1.2.jar:3.1.2] ... 13 common frames omitted Caused by: java.lang.ClassNotFoundException: com.jchaaban.rabbitmqproducer.event.MessageDeletedEvent at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641) ~[na:na] at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188) ~[na:na] at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521) ~[na:na] at java.base/java.lang.Class.forName0(Native Method) ~[na:na] at java.base/java.lang.Class.forName(Class.java:496) ~[na:na] at java.base/java.lang.Class.forName(Class.java:475) ~[na:na] at org.springframework.util.ClassUtils.forName(ClassUtils.java:304) ~[spring-core-6.1.4.jar:6.1.4] at org.springframework.amqp.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:185) ~[spring-amqp-3.1.2.jar:3.1.2] ... 24 common frames omitted

I think this is happening because the MessageDeletedEvent cannot be converted to a org.springframework.amqp.core.Message object, i tryed to use a MessageDeletedEvent Object on the consumer side that has the same fields as the MessageDeletedEvent on the producer side and this worked, but this is not what i need i want to be able to recieve more than one type of event.

Is there any solution for this problem?

Thank you

Upvotes: 0

Views: 263

Answers (1)

Chasca
Chasca

Reputation: 311

Class not found [com.jchaaban.rabbitmqproducer.event.MessageDeletedEvent]

It seems your receiver code might use the producer dependency as compile only and it is not included in the classpath of your spring boot app.

Upvotes: 0

Related Questions