Reputation: 271
So I am using spring-boot2.1.6 and integrating kafka consumer to consume any type of message being published on the topic. For reference I am following https://docs.spring.io/spring-boot/docs/2.1.6.RELEASE/reference/htmlsingle/
So I have dependency in my pom:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
I am configuring in application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: foo
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
value:
default:
type: java.lang.Object
And at last here is my listener code:
@KafkaListener(topics = "videoEnrichedEvents")
public void consume(@Payload VideoEnrichedEventsvideoEnrichedEvents){
LOGGER.debug("Consumed message :"+videoEnrichedEvents);
System.out.println("Consumed Message :"videoEnrichedEvents);
}
Since I have different topic and different consumers for it, I want the consumer configurations to be generic enough so that I can read any object and then delegate it to the processing handler. In the error logs I could see:
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.util.LinkedHashMap] to [com.calamp.connect.vs.model.VideoEnrichedEvents] for GenericMessage [payload={anyotherjson={groups=null, id=0, driverName=from Kusum's console, deviceIdType=null, assetId=null, operatorId=null, avlEventTime=null, videoLink=null, tripId=null, avlEventUuid=null, deviceId=null, appMessageUuid=null, parentAccountList=null, appmsgEventTime=null, enrichedMessage=null, accountId=null}}, headers={kafka_offset=9, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@18213932, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=videoEnrichedEvents, kafka_receivedTimestamp=1590218109430}], failedMessage=GenericMessage [payload={anyotherjson={groups=null, id=0, driverName=from Kusum's console, deviceIdType=null, assetId=null, operatorId=null, avlEventTime=null, videoLink=null, tripId=null, avlEventUuid=null, deviceId=null, appMessageUuid=null, parentAccountList=null, appmsgEventTime=null, enrichedMessage=null, accountId=null}}, headers={kafka_offset=9, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@18213932, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=videoEnrichedEvents, kafka_receivedTimestamp=1590218109430}]
After little googling I found out that ConsumerRecord was used instead of LinkedHashMap everywhere.
And my new code looks like:
@KafkaListener(topics = "videoEnrichedEvents")
public void consume(@Payload ConsumerRecord consumerRecord){
LOGGER.debug("Consumed message!!!Full :"+consumerRecord);
System.out.println("Consumed Message!!! Actual object :"+((LinkedHashMap)consumerRecord.value()));
}
It technically handles any object sent to me. So it solves my purpose. But my question is why ConsumerRecord and not LinkedHashMap? any specific reason?
Upvotes: 0
Views: 3908
Reputation: 174574
The simplest way to deal with this is to use a ByteArrayDeserializer
together with a ByteArrayJsonMessageConverter
bean (simply add it to the application context and Boot will wire it in).
This way, the conversion from JSON is deferred until right before we call the method so we know what the target type is.
See https://docs.spring.io/spring-kafka/docs/2.5.0.RELEASE/reference/html/#messaging-message-conversion
NOTE: This method can't be used with a class-level listener and @KafkaHandler
methods because in that case, we use the type to select which method to call.
Upvotes: 2
Reputation: 1804
The method signature is flexible and can take a ConsumerRecord or the deserialised object contained in the record. The latter relies on deserialization converting the incoming message to the type in the method signature. If Jackson cannot determine what type the incoming JSON message is then it will deserialize to a HashMap, as JSON is effectively just a map and so it is providing exactly what you asked for - an Object (where a LinkedHashMap is the only Object it is able to create with the available information).
Hence the behaviour you are seeing is because the deserializer is not able to deserialize the message to a specific class, and so a method signature can accept either a Map, or a ConsumerRecord - where ConsumerRecord is a valid argument for any message regardless of deserialization.
If you want to handle different types in this way, best to use a custom Deserializer that can look at some aspect of the message and create an instance of the correct class, specify that Deserializer instead of the JsonDeserializer you have in your yaml.
Upvotes: 2