Anyanwu
Anyanwu

Reputation: 1

How can I configure (AMQP) RabbitMq to effectively serialize complex object types?

I want consume a message object of type

 Map<Integer,List<Object>>,

but the

rabbitmq 

was finding it difficult to properly serialize the object. I am sorry if this solution has been provided before, but I could not get a solution for my case. This is thrown exception:


org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'void com.edugreat.akademiksresource.chat.consumer.ChatConsumer.publichPreviousChatsMessages(java.util.Map<java.lang.Integer, java.util.List<com.edugreat.akademiksresource.chat.dto.ChatDTO>>)' threw exception
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:287) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:225) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:149) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1682) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1604) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1592) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1583) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListenerAndHandleException(AbstractMessageListenerContainer.java:1528) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.lambda$executeListener$8(AbstractMessageListenerContainer.java:1506) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at io.micrometer.observation.Observation.observe(Observation.java:499) ~[micrometer-observation-1.13.3.jar:1.13.3]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1506) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1082) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1018) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1421) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1322) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]
Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class java.lang.Integer (java.lang.String and java.lang.Integer are in module java.base of loader 'bootstrap')
    at com.edugreat.akademiksresource.chat.consumer.ChatConsumer.publichPreviousChatsMessages(ChatConsumer.java:94) ~[classes/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:569) ~[na:na]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-6.1.12.jar:6.1.12]
    at org.springframework.amqp.rabbit.listener.adapter.KotlinAwareInvocableHandlerMethod.doInvoke(KotlinAwareInvocableHandlerMethod.java:45) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-6.1.12.jar:6.1.12]
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:75) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:278) ~[spring-rabbit-3.1.7.jar:3.1.7]
    ... 15 common frames omitted

[2m2024-12-08T17:12:04.064+01:00[0;39m [33m WARN[0;39m [35m14059[0;39m [2m---[0;39m [2m[ntContainer#1-1][0;39m [2m[0;39m[36mingErrorHandler$DefaultExceptionStrategy[0;39m [2m:[0;39m Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'[B@74c001b3(byte[12445])' MessageProperties [headers={__ContentTypeId__=java.lang.Object, __KeyTypeId__=java.lang.Object, __TypeId__=java.util.ImmutableCollections$Map1}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=rabbitmqexchange, receivedRoutingKey=previouschatroutingkey, deliveryTag=1, consumerTag=amq.ctag-w0X0omaQgnwqzS7X792LQw, consumerQueue=previouschatqueue])
[2m2024-12-08T17:12:04.064+01:00[0;39m [31mERROR[0;39m [35m14059[0;39m [2m---[0;39m [2m[ntContainer#1-1][0;39m [2m[0;39m[36mo.s.a.r.l.SimpleMessageListenerContainer[0;39m [2m:[0;39m Execution of Rabbit message listener failed, and the error handler threw an exception

org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal
    at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:147) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1475) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1768) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListenerAndHandleException(AbstractMessageListenerContainer.java:1549) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.lambda$executeListener$8(AbstractMessageListenerContainer.java:1506) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at io.micrometer.observation.Observation.observe(Observation.java:499) ~[micrometer-observation-1.13.3.jar:1.13.3]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1506) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1082) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1018) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1421) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1322) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'void com.edugreat.akademiksresource.chat.consumer.ChatConsumer.publichPreviousChatsMessages(java.util.Map<java.lang.Integer, java.util.List<com.edugreat.akademiksresource.chat.dto.ChatDTO>>)' threw exception
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:287) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:225) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:149) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1682) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1604) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1592) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1583) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListenerAndHandleException(AbstractMessageListenerContainer.java:1528) ~[spring-rabbit-3.1.7.jar:3.1.7]
    ... 8 common frames omitted
Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class java.lang.Integer (java.lang.String and java.lang.Integer are in module java.base of loader 'bootstrap')
    at com.edugreat.akademiksresource.chat.consumer.ChatConsumer.publichPreviousChatsMessages(ChatConsumer.java:94) ~[classes/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:569) ~[na:na]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-6.1.12.jar:6.1.12]
    at org.springframework.amqp.rabbit.listener.adapter.KotlinAwareInvocableHandlerMethod.doInvoke(KotlinAwareInvocableHandlerMethod.java:45) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-6.1.12.jar:6.1.12]
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:75) ~[spring-rabbit-3.1.7.jar:3.1.7]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:278) ~[spring-rabbit-3.1.7.jar:3.1.7]
    

POJO definition:

@Data
public class ChatDTO {
    
    @Min(value = 0)
    private Integer id;
    @Min(value = 0)
    private Integer groupId;
    
    @Min(value = 0)
    private Integer senderId;
    
    @Transient
    private String senderName;
@Transient
    private long onlineMembers;
    
    @NotNull(message = "chat message is missing")
    @NotEmpty(message = "chat message is missing")
    private String content;
    
    private LocalDateTime sentAt;

    public ChatDTO(@Min(0) Integer id, @Min(0) Integer groupId, @Min(0) Integer senderId,
            @NotNull(message = "chat message is missing") @NotEmpty(message = "chat message is missing") String content,
            LocalDateTime sentAt) {
    
        this.id = id;
        this.groupId = groupId;
        this.senderId = senderId;
        this.content = content;
        this.sentAt = sentAt;
    }
    
    
}

Consumer service:

@Service
public class ChatConsumer implements ChatConsumerInterface {

    @Autowired
    private GroupMembersDao groupMembersDao;

    @Autowired
    private GroupChatDao groupChatDao;
    
    private final Map<Integer, SseEmitter> emitters = new ConcurrentHashMap<>();

    private static final Logger LOGGER = LoggerFactory.getLogger(ChatConsumer.class);

@RabbitListener(queues = "${previous.chat.queue}")
    void publichPreviousChatsMessages(Map<Integer, List<ChatDTO>> chats) {
        
        
        Integer studentId = chats.keySet().stream().toList().get(0);
        

//      get IDs of group members
        List<Integer> memberIds = groupMembersDao.getMemberIds(chats.get(studentId).get(0).getGroupId());
        
        final long currentlyOnline = currentlyOnlineMembers(memberIds);
        
        
        
//      check if the user is online
        if(emitters.containsKey(studentId)) {
            chats.get(studentId).stream().forEach(chat -> chat.setOnlineMembers(currentlyOnline));
            try {
                emitters.get(studentId).send(SseEmitter.event().data(chats.get(studentId)).name("chats"));
            } catch (IOException e) {
                
                emitters.remove(studentId);
                System.out.println("unable to send previous chat: "+e);
            }
        }
        
        
    }

Message producer:

@Service
public class ChatBroadCastingService implements ChatBroadcaster {

@Autowired
private RabbitTemplate rabbitTemplate;

private static final Logger LOGGER = LoggerFactory.getLogger(ChatBroadCastingService.class);


@Override
    public void previousChatMessages(Map<Integer, List<ChatDTO>> chats) {
        
        
        
        if(chats != null && chats.size() > 0) {
            
                  LOGGER.info(String.format("Produce chat %s", chats.toString()));
            rabbitTemplate.convertAndSend(exchange, previousChatRoutingKey, chats);
        }
        
        
    }

Config class:


@Configuration
public class RabbitMQConfig {

 
    @Bean
     TopicExchange exchange() {
        
        return new TopicExchange(exchange);
    }
    

    @Bean
    Binding previousChatBinding() {
        
        return BindingBuilder.bind(previousChatQueue()).to(exchange()).with(previousChatRoutinKey);
    }
   
    
    @Bean
    MessageConverter converter() {
        
        return new Jackson2JsonMessageConverter();
    }
    
   
    @Bean
    AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
        
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        
        rabbitTemplate.setMessageConverter(converter());
        
        return rabbitTemplate;
    }

I expect the AmqpTemplate to convert and send an object of Map of key Integer and value List<ChatDTO>, but it could not. The LOGGER output actually was serialized to the proper type.

Upvotes: 0

Views: 25

Answers (0)

Related Questions