Reputation: 1
I want consume a message object of type
Map<Integer,List<Object>>,
but the
rabbitmq
was finding it difficult to properly serialize the object. I am sorry if this solution has been provided before, but I could not get a solution for my case. This is thrown exception:
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'void com.edugreat.akademiksresource.chat.consumer.ChatConsumer.publichPreviousChatsMessages(java.util.Map<java.lang.Integer, java.util.List<com.edugreat.akademiksresource.chat.dto.ChatDTO>>)' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:287) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:225) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:149) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1682) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1604) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1592) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1583) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListenerAndHandleException(AbstractMessageListenerContainer.java:1528) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.lambda$executeListener$8(AbstractMessageListenerContainer.java:1506) ~[spring-rabbit-3.1.7.jar:3.1.7]
at io.micrometer.observation.Observation.observe(Observation.java:499) ~[micrometer-observation-1.13.3.jar:1.13.3]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1506) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1082) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1018) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1421) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1322) ~[spring-rabbit-3.1.7.jar:3.1.7]
at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]
Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class java.lang.Integer (java.lang.String and java.lang.Integer are in module java.base of loader 'bootstrap')
at com.edugreat.akademiksresource.chat.consumer.ChatConsumer.publichPreviousChatsMessages(ChatConsumer.java:94) ~[classes/:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:569) ~[na:na]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-6.1.12.jar:6.1.12]
at org.springframework.amqp.rabbit.listener.adapter.KotlinAwareInvocableHandlerMethod.doInvoke(KotlinAwareInvocableHandlerMethod.java:45) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-6.1.12.jar:6.1.12]
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:75) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:278) ~[spring-rabbit-3.1.7.jar:3.1.7]
... 15 common frames omitted
[2m2024-12-08T17:12:04.064+01:00[0;39m [33m WARN[0;39m [35m14059[0;39m [2m---[0;39m [2m[ntContainer#1-1][0;39m [2m[0;39m[36mingErrorHandler$DefaultExceptionStrategy[0;39m [2m:[0;39m Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'[B@74c001b3(byte[12445])' MessageProperties [headers={__ContentTypeId__=java.lang.Object, __KeyTypeId__=java.lang.Object, __TypeId__=java.util.ImmutableCollections$Map1}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=rabbitmqexchange, receivedRoutingKey=previouschatroutingkey, deliveryTag=1, consumerTag=amq.ctag-w0X0omaQgnwqzS7X792LQw, consumerQueue=previouschatqueue])
[2m2024-12-08T17:12:04.064+01:00[0;39m [31mERROR[0;39m [35m14059[0;39m [2m---[0;39m [2m[ntContainer#1-1][0;39m [2m[0;39m[36mo.s.a.r.l.SimpleMessageListenerContainer[0;39m [2m:[0;39m Execution of Rabbit message listener failed, and the error handler threw an exception
org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal
at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:147) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1475) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1768) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListenerAndHandleException(AbstractMessageListenerContainer.java:1549) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.lambda$executeListener$8(AbstractMessageListenerContainer.java:1506) ~[spring-rabbit-3.1.7.jar:3.1.7]
at io.micrometer.observation.Observation.observe(Observation.java:499) ~[micrometer-observation-1.13.3.jar:1.13.3]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1506) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1082) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1018) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1421) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1322) ~[spring-rabbit-3.1.7.jar:3.1.7]
at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'void com.edugreat.akademiksresource.chat.consumer.ChatConsumer.publichPreviousChatsMessages(java.util.Map<java.lang.Integer, java.util.List<com.edugreat.akademiksresource.chat.dto.ChatDTO>>)' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:287) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:225) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:149) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1682) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1604) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1592) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1583) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListenerAndHandleException(AbstractMessageListenerContainer.java:1528) ~[spring-rabbit-3.1.7.jar:3.1.7]
... 8 common frames omitted
Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class java.lang.Integer (java.lang.String and java.lang.Integer are in module java.base of loader 'bootstrap')
at com.edugreat.akademiksresource.chat.consumer.ChatConsumer.publichPreviousChatsMessages(ChatConsumer.java:94) ~[classes/:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:569) ~[na:na]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-6.1.12.jar:6.1.12]
at org.springframework.amqp.rabbit.listener.adapter.KotlinAwareInvocableHandlerMethod.doInvoke(KotlinAwareInvocableHandlerMethod.java:45) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-6.1.12.jar:6.1.12]
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:75) ~[spring-rabbit-3.1.7.jar:3.1.7]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:278) ~[spring-rabbit-3.1.7.jar:3.1.7]
POJO definition:
@Data
public class ChatDTO {
@Min(value = 0)
private Integer id;
@Min(value = 0)
private Integer groupId;
@Min(value = 0)
private Integer senderId;
@Transient
private String senderName;
@Transient
private long onlineMembers;
@NotNull(message = "chat message is missing")
@NotEmpty(message = "chat message is missing")
private String content;
private LocalDateTime sentAt;
public ChatDTO(@Min(0) Integer id, @Min(0) Integer groupId, @Min(0) Integer senderId,
@NotNull(message = "chat message is missing") @NotEmpty(message = "chat message is missing") String content,
LocalDateTime sentAt) {
this.id = id;
this.groupId = groupId;
this.senderId = senderId;
this.content = content;
this.sentAt = sentAt;
}
}
Consumer service:
@Service
public class ChatConsumer implements ChatConsumerInterface {
@Autowired
private GroupMembersDao groupMembersDao;
@Autowired
private GroupChatDao groupChatDao;
private final Map<Integer, SseEmitter> emitters = new ConcurrentHashMap<>();
private static final Logger LOGGER = LoggerFactory.getLogger(ChatConsumer.class);
@RabbitListener(queues = "${previous.chat.queue}")
void publichPreviousChatsMessages(Map<Integer, List<ChatDTO>> chats) {
Integer studentId = chats.keySet().stream().toList().get(0);
// get IDs of group members
List<Integer> memberIds = groupMembersDao.getMemberIds(chats.get(studentId).get(0).getGroupId());
final long currentlyOnline = currentlyOnlineMembers(memberIds);
// check if the user is online
if(emitters.containsKey(studentId)) {
chats.get(studentId).stream().forEach(chat -> chat.setOnlineMembers(currentlyOnline));
try {
emitters.get(studentId).send(SseEmitter.event().data(chats.get(studentId)).name("chats"));
} catch (IOException e) {
emitters.remove(studentId);
System.out.println("unable to send previous chat: "+e);
}
}
}
Message producer:
@Service
public class ChatBroadCastingService implements ChatBroadcaster {
@Autowired
private RabbitTemplate rabbitTemplate;
private static final Logger LOGGER = LoggerFactory.getLogger(ChatBroadCastingService.class);
@Override
public void previousChatMessages(Map<Integer, List<ChatDTO>> chats) {
if(chats != null && chats.size() > 0) {
LOGGER.info(String.format("Produce chat %s", chats.toString()));
rabbitTemplate.convertAndSend(exchange, previousChatRoutingKey, chats);
}
}
Config class:
@Configuration
public class RabbitMQConfig {
@Bean
TopicExchange exchange() {
return new TopicExchange(exchange);
}
@Bean
Binding previousChatBinding() {
return BindingBuilder.bind(previousChatQueue()).to(exchange()).with(previousChatRoutinKey);
}
@Bean
MessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
@Bean
AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(converter());
return rabbitTemplate;
}
I expect the AmqpTemplate
to convert and send an object of Map
of key Integer
and value List<ChatDTO>
, but it could not. The LOGGER
output actually was serialized to the proper type.
Upvotes: 0
Views: 25