SRI HARSHA S V S
SRI HARSHA S V S

Reputation: 123

Kafka Json Value Deserializer

I am using a kafka consumer with the below properties:

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.connect.json.JsonDeserializer

A KafkaProducer(value.serializer=org.apache.kafka.connect.json.JsonSerializer) is pushing JSON records into a Topic and this Consumer is reading from it, Functionality-wise its working fine, but problem comes up when my producer pushes a non-JSON message (eg: empty message).

In this case, The consumer is going down and it will not consume until that empty message is cleared(I've reset the offset of the consumer group to latest).

Is there any way to handle this, maybe using some property or something like that

Upvotes: 1

Views: 8215

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 192023

The Consumer API has no deserialization exception handling properties like Kafka Streams does

You'll need to create your own Deserializer that wraps the json one and handles any errors

You may find the SafeDeserializer class in azkarra-commons to be useful

Upvotes: 1

Related Questions