Swordfish
Swordfish

Reputation: 1281

Handling JSON deserialisation error with Spring KafkaListener

I'm using Spring KafkaListener to consume some messages from a Kafka topic. Looking at the logs, I've noticed an error when trying to deserialise a JSON message to a bean.

The exception thrown is this:

Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `java.util.ArrayList` out of VALUE_STRING token
 at [Source: (byte[])"{"id": 42, "userId": "", "type": "EMAIL", "status": "CREATE", "creationDate": "2019-02-26T11:13:44.000+0000", "lastModificationDate": "2019-02-26T11:13:44.000+0000", "title": "Your weekly Google Ads update: (Week of 02/11- 02/17)", "subject": "Your weekly Google Ads update: (Week of 02/11- 02/17)", "conversationOn": "", "mainKeyword": "Google Ads App", "keywords": ["Google Ads App", "People", "GBP", "AdWords App", "Google Ireland Ltd"], "actions": [], "emails""[truncated 8510 bytes]; line: 1, column: 752] (through reference chain: event.model.EmailTopicMessage["emails"]->feed.domain.email.Emails["messages"]->java.util.ArrayList[0]->feed.domain.email.Message["from"])
    at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63) ~[jackson-databind-2.9.7.jar!/:2.9.7]
    at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1343) ~[jackson-databind-2.9.7.jar!/:2.9.7]
    at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1139) ~[jackson-databind-2.9.7.jar!/:2.9.7]
    at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1093) ~[jackson-databind-2.9.7.jar!/:2.9.7]
    at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.handleNonArray(CollectionDeserializer.java:332) ~[jackson-databind-2.9.7.jar!/:2.9.7]
    at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:265) ~[jackson-databind-2.9.7.jar!/:2.9.7]
    at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:245) ~[jackson-databind-2.9.7.jar!/:2.9.7]
    at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:27) ~[jackson-databind-2.9.7.jar!/:2.9.7]
    at com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:127) ~[jackson-databind-2.9.7.jar!/:2.9.7]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:369) ~[jackson-databind-2.9.7.jar!/:2.9.7]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159) ~[jackson-databind-2.9.7.jar!/:2.9.7]
    at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:286) ~[jackson-databind-2.9.7.jar!/:2.9.7]
    at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:245) ~[jackson-databind-2.9.7.jar!/:2.9.7]
    at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:27) ~[jackson-databind-2.9.7.jar!/:2.9.7]
    at com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:127) ~[jackson-databind-2.9.7.jar!/:2.9.7]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:369) ~[jackson-databind-2.9.7.jar!/:2.9.7]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159) ~[jackson-databind-2.9.7.jar!/:2.9.7]
    at com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:127) ~[jackson-databind-2.9.7.jar!/:2.9.7]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:369) ~[jackson-databind-2.9.7.jar!/:2.9.7]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159) ~[jackson-databind-2.9.7.jar!/:2.9.7]
    at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:1611) ~[jackson-databind-2.9.7.jar!/:2.9.7]
    at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1234) ~[jackson-databind-2.9.7.jar!/:2.9.7]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:257) ~[spring-kafka-2.1.11.RELEASE.jar!/:2.1.11.RELEASE]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:233) ~[spring-kafka-2.1.11.RELEASE.jar!/:2.1.11.RELEASE]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923) ~[kafka-clients-1.0.2.jar!/:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93) ~[kafka-clients-1.0.2.jar!/:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100) ~[kafka-clients-1.0.2.jar!/:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949) ~[kafka-clients-1.0.2.jar!/:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570) ~[kafka-clients-1.0.2.jar!/:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531) ~[kafka-clients-1.0.2.jar!/:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154) ~[kafka-clients-1.0.2.jar!/:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111) ~[kafka-clients-1.0.2.jar!/:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:712) ~[spring-kafka-2.1.11.RELEASE.jar!/:2.1.11.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_171]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_171]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]

How do I handle errors and proceed with the next message?

Also what would be best practice on how to handle these type of errors, log to a file or put into another queue for analysis investigation and reprocessing?

Any example code would be highly appreciated.

Upvotes: 1

Views: 5284

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121272

You need to configure an ErrorHandlingDeserializer2 to wrap that JSON one for better error handling during deserialization.

See Docs for more info: https://docs.spring.io/spring-kafka/docs/2.2.4.RELEASE/reference/#error-handling-deserializer

When a deserializer fails to deserialize a message, Spring has no way to handle the problem, because it occurs before the poll() returns. To solve this problem, version 2.2 introduced the ErrorHandlingDeserializer2. This deserializer delegates to a real deserializer (key or value). If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer2 returns a null value and a DeserializationException in a header that contains the cause and the raw bytes. When you use a record-level MessageListener, if the ConsumerRecord contains a DeserializationException header for either the key or value, the container’s ErrorHandler is called with the failed ConsumerRecord. The record is not passed to the listener.

Upvotes: 1

Related Questions