Kusum
Kusum

Reputation: 271

Kafka with Spring-boot: Consumer to consume java.lang.Object( Any/all object) via ConsumerRecord

So I am using spring-boot2.1.6 and integrating kafka consumer to consume any type of message being published on the topic. For reference I am following https://docs.spring.io/spring-boot/docs/2.1.6.RELEASE/reference/htmlsingle/

So I have dependency in my pom:

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

I am configuring in application.yml

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: foo
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring:
          json:
            value:
              default:
                type: java.lang.Object

And at last here is my listener code:

 @KafkaListener(topics = "videoEnrichedEvents")
    public void consume(@Payload VideoEnrichedEventsvideoEnrichedEvents){
        LOGGER.debug("Consumed message :"+videoEnrichedEvents);
        System.out.println("Consumed Message :"videoEnrichedEvents);
    }

Since I have different topic and different consumers for it, I want the consumer configurations to be generic enough so that I can read any object and then delegate it to the processing handler. In the error logs I could see:

Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.util.LinkedHashMap] to [com.calamp.connect.vs.model.VideoEnrichedEvents] for GenericMessage [payload={anyotherjson={groups=null, id=0, driverName=from Kusum's console, deviceIdType=null, assetId=null, operatorId=null, avlEventTime=null, videoLink=null, tripId=null, avlEventUuid=null, deviceId=null, appMessageUuid=null, parentAccountList=null, appmsgEventTime=null, enrichedMessage=null, accountId=null}}, headers={kafka_offset=9, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@18213932, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=videoEnrichedEvents, kafka_receivedTimestamp=1590218109430}], failedMessage=GenericMessage [payload={anyotherjson={groups=null, id=0, driverName=from Kusum's console, deviceIdType=null, assetId=null, operatorId=null, avlEventTime=null, videoLink=null, tripId=null, avlEventUuid=null, deviceId=null, appMessageUuid=null, parentAccountList=null, appmsgEventTime=null, enrichedMessage=null, accountId=null}}, headers={kafka_offset=9, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@18213932, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=videoEnrichedEvents, kafka_receivedTimestamp=1590218109430}]

After little googling I found out that ConsumerRecord was used instead of LinkedHashMap everywhere.
And my new code looks like:

 @KafkaListener(topics = "videoEnrichedEvents")
    public void consume(@Payload ConsumerRecord consumerRecord){
        LOGGER.debug("Consumed message!!!Full :"+consumerRecord);
        System.out.println("Consumed Message!!! Actual object :"+((LinkedHashMap)consumerRecord.value()));
    }

It technically handles any object sent to me. So it solves my purpose. But my question is why ConsumerRecord and not LinkedHashMap? any specific reason?

Upvotes: 0

Views: 3908

Answers (2)

Gary Russell
Gary Russell

Reputation: 174574

The simplest way to deal with this is to use a ByteArrayDeserializer together with a ByteArrayJsonMessageConverter bean (simply add it to the application context and Boot will wire it in).

This way, the conversion from JSON is deferred until right before we call the method so we know what the target type is.

See https://docs.spring.io/spring-kafka/docs/2.5.0.RELEASE/reference/html/#messaging-message-conversion

NOTE: This method can't be used with a class-level listener and @KafkaHandler methods because in that case, we use the type to select which method to call.

Upvotes: 2

Chris
Chris

Reputation: 1804

The method signature is flexible and can take a ConsumerRecord or the deserialised object contained in the record. The latter relies on deserialization converting the incoming message to the type in the method signature. If Jackson cannot determine what type the incoming JSON message is then it will deserialize to a HashMap, as JSON is effectively just a map and so it is providing exactly what you asked for - an Object (where a LinkedHashMap is the only Object it is able to create with the available information).

Hence the behaviour you are seeing is because the deserializer is not able to deserialize the message to a specific class, and so a method signature can accept either a Map, or a ConsumerRecord - where ConsumerRecord is a valid argument for any message regardless of deserialization.

If you want to handle different types in this way, best to use a custom Deserializer that can look at some aspect of the message and create an instance of the correct class, specify that Deserializer instead of the JsonDeserializer you have in your yaml.

Upvotes: 2

Related Questions