Reputation: 6116
I have a spring boot project where I'm trying to integrate with a rabbitmq server so I can publish and read messages to/from a queue.
Here's my rabbitmq config (edited to only show relevant details):
@Configuration
@ConfigurationProperties(prefix="rabbit")
public class RabbitConfig {
private String queue;
@Bean
Queue queue() {
return new Queue(queue, durable);
}
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, Queue queue,
MessageListenerAdapter listenerAdapter, MessageConverter messageConverter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue);
container.setMessageListener(listenerAdapter);
container.setMessageConverter(messageConverter);
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver);
}
public void setQueue(String queue) {
this.queue = queue;
}
}
Here is my Receiver classes:
public interface Receiver {
void handleMessage(FooA message);
}
@Component
public class RabbitReceiver implements Receiver {
@Override
public void handleMessage(FooA message) {
System.out.println(message);
}
}
And my pojo:
public class FooA {}
private double num;
private Map<String, String> map = new HashMap();
public FooA() {
}
public FooA(double num, Map<String, String> map) {
this.num = num;
this.map = map;
}
public int getnum() {
return num;
}
public Map<String, String> getMap() {
return map;
}
}
I am successfully able to publish a FooA
message object to the queue. Here's what it looks like in the queue:
[
{
"payload_bytes": 41,
"redelivered": false,
"exchange": "amq.fanout",
"routing_key": "",
"message_count": 0,
"properties": {
"priority": 0,
"delivery_mode": 2,
"headers": {
"__TypeId__": "com.test.FooA"
},
"content_encoding": "UTF-8",
"content_type": "application/json"
},
"payload": "{\"num\":1.2,\"map\":{}}",
"payload_encoding": "string"
}
]
But when I try to read from the queue I get this error:
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Failed to invoke target method 'handleMessage' with argument type = [class [B], value = [{[B@75a7bfc9}]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:408) ~[spring-rabbit-1.7.3.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:298) ~[spring-rabbit-1.7.3.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:822) ~[spring-rabbit-1.7.3.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:745) ~[spring-rabbit-1.7.3.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:97) [spring-rabbit-1.7.3.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:189) ~[spring-rabbit-1.7.3.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1276) [spring-rabbit-1.7.3.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:726) ~[spring-rabbit-1.7.3.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1219) [spring-rabbit-1.7.3.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1189) [spring-rabbit-1.7.3.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1500(SimpleMessageListenerContainer.java:97) [spring-rabbit-1.7.3.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1421) [spring-rabbit-1.7.3.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
Caused by: java.lang.NoSuchMethodException: com.test.RabbitReceiver.handleMessage([B)
at java.lang.Class.getMethod(Class.java:1786) ~[na:1.8.0_121]
at org.springframework.util.MethodInvoker.prepare(MethodInvoker.java:174) ~[spring-core-4.3.9.RELEASE.jar:4.3.9.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:386) ~[spring-rabbit-1.7.3.RELEASE.jar:na]
... 12 common frames omitted
What am I doing wrong?
Edit 1: I changed the method to:
@Override
public void handleMessage(byte[] message) {
System.out.println(message);
}
Which worked but it's totally unusable. It just shows up like this:
How do I get this to map to my pojo FooA
Upvotes: 3
Views: 10912
Reputation: 925
This article shows how to solve this in an easier way: https://thepracticaldeveloper.com/produce-and-consume-json-messages-with-spring-boot-amqp/
Configuration:
@Bean
public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
final var rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
return rabbitTemplate;
}
@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}
Listener:
@RabbitListener(queues = MessagingApplication.QUEUE_SPECIFIC_NAME)
public void receiveMessage(final CustomMessage customMessage) {
log.info("Received message and deserialized to 'CustomMessage': {}", customMessage.toString());
}
POJO:
public record CustomMessage(@JsonProperty("text") String text,
@JsonProperty("priority") int priority,
@JsonProperty("secret") boolean secret)
implements Serializable {
}
Upvotes: 0
Reputation: 1
You are doing everything correctly. But you are not providing any method to MessageListenerAdapter
to receive and process your message. That's why you are getting ListenerExecutionFailedException: Failed to invoke target method 'handleMessage' .
To fix this change the below code:
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver);
}
to:
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "handleMessage");
}
Upvotes: 0
Reputation: 23
Convert the bytearray to a string and use jackson mapper (ObjectMapper) to map to your pojo. You have configurations on your mapper: e.g.
this.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
this.configure(SerializationFeature.WRITE_NULL_MAP_VALUES, false);
this.setSerializationInclusion(JsonInclude.Include.NON_EMPTY);
this.registerModule(new JavaTimeModule())
Upvotes: 1
Reputation: 6116
I just had the classes implement the Serializable
interface removed the JsonMessageConverter. Json and Serializable was conflicting so it didn't work.
Upvotes: 1
Reputation: 9516
It looks like the message is posted as a String and not as JSON. Thats way you see
"payload_encoding": "string"
and Spring can automatically convert this string into to a byte[], as shown in the screenshot.
You need to set the message converter also on the RabbitTemplate like this :
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,MessageConverter rabbitJsonMessageConverter) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(rabbitJsonMessageConverter);
return template;
}
This should correctly publish the message as JSON.
Upvotes: 0