Reputation: 268
I'm using spring-boot 2.3.2.RELEASE with spring-kafka -> 2.5.4.RELEASE kafka-clients -> 2.5.0
I have the following simple listener
@Slf4j
@Component
class SampleKafkaListener {
private final ObjectMapper objectMapper;
@Autowired
SampleKafkaListener(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
@KafkaListener(topics = "person-topic-as-string")
public void onMessageReceived(String personMsg) throws JsonProcessingException {
Person person = objectMapper.readerFor(Person.class).readValue(personMsg);
log.info("Received person {}", person);
}
@KafkaListener(topics = "persons-topic-obj")
public void onMessageReceived(Person person) {
log.info("Received person {}", person);
}
}
And my application.yaml is as simple as possible. Only thing related to kafka is
spring:
kafka:
consumer:
group-id: mine-group
Now I post simple message like this into those topics
{"firstName": "John", "lastName": "Nakamura"}
It ends up with execption like below. I know it is trying to read type from kafka-headers, but there won't be any. Why spring cannot use default objectMapper that it already has in context? How to configure it so I don't need to invoke serializer manually?
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.experiments.kafka.SampleKafkaListener.onMessageReceived(com.experiments.kafka.Person)]
Bean [com.experiments.kafka.SampleKafkaListener@426229a1]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.experiments.kafka.Person] for GenericMessage [payload={"firstName":"John","lastName":"Nakamura"}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7b486f55, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=persons-topic-obj, kafka_receivedTimestamp=1598347300390, kafka_groupId=mine-group}], failedMessage=GenericMessage [payload={"firstName":"John","lastName":"Nakamura"}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7b486f55, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=persons-topic-obj, kafka_receivedTimestamp=1598347300390, kafka_groupId=mine-group}]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.experiments.kafka.Person] for GenericMessage [payload={"firstName":"John","lastName":"Nakamura"}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7b486f55, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=persons-topic-obj, kafka_receivedTimestamp=1598347300390, kafka_groupId=mine-group}], failedMessage=GenericMessage [payload={"firstName":"John","lastName":"Nakamura"}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7b486f55, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=persons-topic-obj, kafka_receivedTimestamp=1598347300390, kafka_groupId=mine-group}]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1923)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1911)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1810)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1737)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1634)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1364)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1080)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:988)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.experiments.kafka.Person] for GenericMessage [payload={"firstName":"John","lastName":"Nakamura"}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7b486f55, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=persons-topic-obj, kafka_receivedTimestamp=1598347300390, kafka_groupId=mine-group}], failedMessage=GenericMessage [payload={"firstName":"John","lastName":"Nakamura"}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7b486f55, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=persons-topic-obj, kafka_receivedTimestamp=1598347300390, kafka_groupId=mine-group}]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:340)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:86)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1878)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1860)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1797)
... 8 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.experiments.kafka.Person] for GenericMessage [payload={"firstName":"John","lastName":"Nakamura"}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7b486f55, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=persons-topic-obj, kafka_receivedTimestamp=1598347300390, kafka_groupId=mine-group}]
at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaListenerAnnotationBeanPostProcessor.java:891)
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116)
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:329)
... 13 common frames omitted
If I use
spring:
kafka:
consumer:
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
Then it fails with nevereding loop of exceptions
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:145)
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:103)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1263)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1020)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition persons-topic-obj-0 at offset 2. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
at org.springframework.util.Assert.state(Assert.java:76)
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:483)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1310)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3500(Fetcher.java:128)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1541)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1377)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:677)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:632)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1315)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:205)
at com.sun.proxy.$Proxy120.poll(Unknown Source)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1107)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1063)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:988)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
at java.base/java.lang.Thread.run(Thread.java:834)
Upvotes: 0
Views: 1421
Reputation: 1082
By default Kafka Consumer uses StringDeserializer. You need to configure it by setting proper deserializer (spring-kafka
already has JsonDeserializer)
spring:
kafka:
consumer:
group-id: mine-group
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
Upvotes: -1