Emiter
Emiter

Reputation: 268

Serialization for @KafkaListener

I'm using spring-boot 2.3.2.RELEASE with spring-kafka -> 2.5.4.RELEASE kafka-clients -> 2.5.0

I have the following simple listener

@Slf4j
@Component
class SampleKafkaListener {
    private final ObjectMapper objectMapper;

    @Autowired
    SampleKafkaListener(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
    }

    @KafkaListener(topics = "person-topic-as-string")
    public void onMessageReceived(String personMsg) throws JsonProcessingException {
        Person person = objectMapper.readerFor(Person.class).readValue(personMsg);
        log.info("Received person {}", person);
    }

    @KafkaListener(topics = "persons-topic-obj")
    public void onMessageReceived(Person person) {
        log.info("Received person {}", person);
    }
}

And my application.yaml is as simple as possible. Only thing related to kafka is

 spring:
   kafka:
     consumer:
       group-id: mine-group

Now I post simple message like this into those topics

{"firstName": "John", "lastName": "Nakamura"}
  1. using kafkacat -b localhost -t person-topic-as-string Works as expected displays message in logs
  2. when sending same message to persons-topic-obj

It ends up with execption like below. I know it is trying to read type from kafka-headers, but there won't be any. Why spring cannot use default objectMapper that it already has in context? How to configure it so I don't need to invoke serializer manually?

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.experiments.kafka.SampleKafkaListener.onMessageReceived(com.experiments.kafka.Person)]
Bean [com.experiments.kafka.SampleKafkaListener@426229a1]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.experiments.kafka.Person] for GenericMessage [payload={"firstName":"John","lastName":"Nakamura"}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7b486f55, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=persons-topic-obj, kafka_receivedTimestamp=1598347300390, kafka_groupId=mine-group}], failedMessage=GenericMessage [payload={"firstName":"John","lastName":"Nakamura"}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7b486f55, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=persons-topic-obj, kafka_receivedTimestamp=1598347300390, kafka_groupId=mine-group}]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.experiments.kafka.Person] for GenericMessage [payload={"firstName":"John","lastName":"Nakamura"}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7b486f55, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=persons-topic-obj, kafka_receivedTimestamp=1598347300390, kafka_groupId=mine-group}], failedMessage=GenericMessage [payload={"firstName":"John","lastName":"Nakamura"}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7b486f55, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=persons-topic-obj, kafka_receivedTimestamp=1598347300390, kafka_groupId=mine-group}]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1923)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1911)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1810)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1737)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1634)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1364)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1080)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:988)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.experiments.kafka.Person] for GenericMessage [payload={"firstName":"John","lastName":"Nakamura"}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7b486f55, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=persons-topic-obj, kafka_receivedTimestamp=1598347300390, kafka_groupId=mine-group}], failedMessage=GenericMessage [payload={"firstName":"John","lastName":"Nakamura"}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7b486f55, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=persons-topic-obj, kafka_receivedTimestamp=1598347300390, kafka_groupId=mine-group}]
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:340)
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:86)
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1878)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1860)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1797)
        ... 8 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.experiments.kafka.Person] for GenericMessage [payload={"firstName":"John","lastName":"Nakamura"}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7b486f55, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=persons-topic-obj, kafka_receivedTimestamp=1598347300390, kafka_groupId=mine-group}]
        at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145)
        at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaListenerAnnotationBeanPostProcessor.java:891)
        at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117)
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148)
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116)
        at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:329)
        ... 13 common frames omitted

If I use

spring:
  kafka:
    consumer:
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

Then it fails with nevereding loop of exceptions

java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:145)
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:103)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1263)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1020)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition persons-topic-obj-0 at offset 2. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
    at org.springframework.util.Assert.state(Assert.java:76)
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:483)
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1310)
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3500(Fetcher.java:128)
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1541)
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1377)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:677)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:632)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1315)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:205)
    at com.sun.proxy.$Proxy120.poll(Unknown Source)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1107)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1063)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:988)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.base/java.lang.Thread.run(Thread.java:834)


Upvotes: 0

Views: 1421

Answers (1)

sawim
sawim

Reputation: 1082

By default Kafka Consumer uses StringDeserializer. You need to configure it by setting proper deserializer (spring-kafka already has JsonDeserializer)

spring:
   kafka:
     consumer:
       group-id: mine-group
       value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

Upvotes: -1

Related Questions