kenshinji
kenshinji

Reputation: 2091

How to deal with JSON message with spring-rabbit in spring boot application?

Here are my code snippets.

However I got following error message when I ran my application.

2017-02-28 17:16:35.931  WARN 11828 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:872) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:782) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:702) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:186) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1227) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:683) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1181) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1165) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1500(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1367) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
Caused by: org.springframework.amqp.AmqpException: No method found for class [B
    at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:127) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getMethodNameFor(DelegatingInvocableHandler.java:224) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.getMethodAsString(HandlerAdapter.java:61) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:140) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:106) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:779) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    ... 10 common frames omitted

So what should I do if all I want is to print the JSON message in receive() method? I'd really appreciate that anyone can shed a light on this. :)

Upvotes: 11

Views: 26756

Answers (5)

Mr.Cat
Mr.Cat

Reputation: 330

Your incoming message should include a RabbitMQ property content_type set to application/json

and you need to declare a JSON Message converter bean in your app

@Bean
public MessageConverter jackson2JsonMessageConverter(ObjectMapper customeObjectMapper) {
    return new Jackson2JsonMessageConverter(customObjectMapper); //The customObjectMapper param her is optional

}

If you're relying on Spring Boot auto configs, it should be picked by all listeners automatically.

If you're not using Spring Boot or doing the configuration programmatically, then you can add it to SimpleMessageListenerContainer or SimpleRabbitListenerContainerFactory beans

simpleRabbitListenerContainerFactory.setMessageConverter(jackson2JsonMessageConverter);

or you can override it in @RabbitListener

@RabbitListener(messageConverter = "jackson2JsonMessageConverter")

After doing that, you're most probably going to end up with another similar exception

org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.util.LinkedHashMap] to ... 

This is because you're using @RabbitListener on class level.
From Message Conversion for Annotated Methods :: Spring AMQP:

There are two conversion steps in the pipeline before invoking the listener. The first step uses a MessageConverter to convert the incoming Spring AMQP Message to a Spring-messaging Message. When the target method is invoked, the message payload is converted, if necessary, to the method parameter type.

and

In versions prior to 1.6, the type information to convert the JSON had to be provided in message headers, or a custom ClassMapper was required. Starting with version 1.6, if there are no type information headers, the type can be inferred from the target method arguments.

Note: This type inference works only for @RabbitListener at the method level.

So you can either:

  1. Specify the __TypeId__ header set to the fully qualified name of your POJO class, but then you will require every system publishing messages for your app to consume, to know this info and set this header.

  2. Or you can move @RabbitListener to method level


@Component
public class Receiver {

    @RabbitListener(queues = "testMQ")
    public void receive(Message msg){
        System.out.println(msg.toString());
    }
}
  1. or you can do the deserialization of JSON string to POJO yourself, something like:
@Component
@RabbitListener(queues = "testMQ")
public class Receiver {

    @RabbitHandler
    public void receive(Message message){ //NOTE this is org.springframework.amqp.core.Message
        SomeDTO dto = objectMapper.readValue(message.getBody(), SomeDTO.class);
    
    }
}

Upvotes: 0

The Wind
The Wind

Reputation: 21

I know its been awhile since the last response, but I'd want to add my answer. Just in case someone else is having the same issues.

When I set up a receiver like yours, I experienced the similar problem where I couldn't convert JSON in the body of the RabbitMQ Message to my object. It simply returns an error message like this.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Failed to convert message
        at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:146) ~[spring-rabbit-2.3.9.jar:2.3.9]
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1654) ~[spring-rabbit-2.3.9.jar:2.3.9]
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1573) ~[spring-rabbit-2.3.9.jar:2.3.9]
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1561) ~[spring-rabbit-2.3.9.jar:2.3.9]
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1552) ~[spring-rabbit-2.3.9.jar:2.3.9]
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1496) ~[spring-rabbit-2.3.9.jar:2.3.9]
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:968) [spring-rabbit-2.3.9.jar:2.3.9]
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914) [spring-rabbit-2.3.9.jar:2.3.9]
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) [spring-rabbit-2.3.9.jar:2.3.9]
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1289) [spring-rabbit-2.3.9.jar:2.3.9]
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195) [spring-rabbit-2.3.9.jar:2.3.9]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_271]
Caused by: org.springframework.amqp.AmqpException: No method found for class java.util.LinkedHashMap
        at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:185) ~[spring-rabbit-2.3.9.jar:2.3.9]
        at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getMethodFor(DelegatingInvocableHandler.java:317) ~[spring-rabbit-2.3.9.jar:2.3.9]
        at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.getMethodFor(HandlerAdapter.java:110) ~[spring-rabbit-2.3.9.jar:2.3.9]
        at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:192) ~[spring-rabbit-2.3.9.jar:2.3.9]
        at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:137) ~[spring-rabbit-2.3.9.jar:2.3.9]
        ... 11 common frames omitted

After hours of searching through different websites without any good answer, I finally found out that @RabbitListener won't work at the class level. When you move it to the method level, though, it works fine.

So, I remove @RabbitHandler and move @RabbitListener to the receive method.

Your receiver code should look like this after editing.

public class Receiver {

    @RabbitListener(queues = "testMQ")
    public void receive(Message msg){
        System.out.println(msg.toString());
    }
}

I still don't fully understand why it is not working at the class level. If anyone has a good explanation, please share it in the comments section. Thank you very much.

Source: https://titanwolf.org/Network/Articles/Article?AID=0f214d79-10f0-4b4c-9478-607428770256#gsc.tab=0

Upvotes: 1

Musab Abdelrahman
Musab Abdelrahman

Reputation: 45

in Simple way you can use this function new String(Messages)

@RabbitListener(queues = "testMQ")
public class Receiver {

    @RabbitHandler
    public void receive(Message msg){
String MQMessage = new String(msg.getBody());
        System.out.println(MQMessage);
    }
}

Upvotes: 1

Amir
Amir

Reputation: 2108

In order to send JSON to RabbitMQ and consume it by Spring Boot we need to set content_type.

Let me describe with an example where I had a Python Producer and Java consumer (I was sending the JSON from Python to RabbitMQ and Spring Boot Java was supposed to receive the JSON task).

There are two solutions:

Solution 1: Sending as JSON string and convert it manually using Jakson or GSON

You need to set the content_type="text/plain" and convert the JSON to a string.Then in the Spring side, use a fuction with a string as the input as the listener and manually convert the object.

RabbitHandler:

@RabbitHandler
public void receive(String inputString) throws IOException {
    ObjectMapper objectMapper = new ObjectMapper();
    SimStatusReport theResult = objectMapper.readValue(inputString, SimStatusReport.class);

    System.out.println("String instance "  + theResult.toString() +
            " [x] Received");
}

SimStatusReport Object:

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class SimStatusReport {
    private String id;
    private int t;
}

Here is my Python Code:

import pika
import json
import uuid


connectionResult = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channelResult = connectionResult.channel()
routing_key_result = 'sim_results'
channelResult.queue_declare(queue=routing_key_result, durable=True)

def publish_result(sim_status):
    message =json.dumps(sim_status)
    channelResult.basic_publish(exchange='',
                                routing_key=routing_key_result,
                                body=message,
                                properties=pika.BasicProperties(
                                    content_type="text/plain",
                                    content_encoding= 'UTF-8',
                                    delivery_mode=2,  # make message persistent
                          ))
    print("Sent ", message)


newsim_status = {'id': str(uuid.uuid4()), 't': 0}
publish_result(newsim_status)

Solution 2: Sending JSON string and let the Jackson2JsonMessageConverter do the conversion for you automatically.

You need to set the content_type="application/json". Then you need to add the appropriate header to __TypeId__ in the header of the RabbitMQ request. You need to include the exact name space of the object so the Jackson undestand the the conversion.

Here is my example using Python (just the publish_result fuction):

def publish_result(sim_status):
    message =json.dumps(sim_status)
    channelResult.basic_publish(exchange='',
                                routing_key=routing_key_result,
                                body=message,
                                properties=pika.BasicProperties(
                                    content_type="application/json"
                                    headers={'__TypeId__': 'com.zarinbal.simtest.run.model.SimStatusReport'},
                                    content_encoding= 'UTF-8',
                                    delivery_mode=2,  # make message persistent
                          ))
    print("Sent ", message)

Then you need to configure the Java to use Jackson2JsonMessageConverter:

@Configuration
    public class RabbitConfiguration {
        @Bean
        public MessageConverter jsonMessageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    }

Here would be your listener:

@RabbitListener(queues = "sim_results")
public class TaskReceiver {
    @RabbitHandler
    public void receive(SimStatusReport in) {
        System.out.println("Object instance "  + in +
                " [x] Received");
    }
}

Note: Make sure all you objects has setter and getters for all properties and all argument constructor. I use @Data, @NoArgsConstructor and @AllArgsConstructor from lombok to automatically generate it

Upvotes: 5

Artem Bilan
Artem Bilan

Reputation: 121552

If you use Spring Boot, you just need to configure:

@Bean
public MessageConverter jsonMessageConverter() {
    return new Jackson2JsonMessageConverter();
}

Otherwise you have to configure:

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
...
    factory.setMessageConverter(new Jackson2JsonMessageConverter());
...
    return factory;
}

http://docs.spring.io/spring-amqp/docs/1.7.0.RELEASE/reference/html/_reference.html#async-annotation-driven

Upvotes: 14

Related Questions