Jay.Doc
Jay.Doc

Reputation: 61

Using multiple deserializers for a kafka consumer

I am new to kafka and even serialization. until now I was required to handle kafka events of json format serialized using simple code. but now extra events are being added using Avro encoder. so now I want this single consumer to use StringDeserialzer if in json and for Avro its respective deserializer. but how can i map 2 Deserializer in the same properties file?

private Properties getProps(){
    Properties props = new Properties();
    props.put("group.id", env.getProperty("group.id"));
    props.put("enable.auto.commit", env.getProperty("enable.auto.commit"));
    props.put("key.deserializer", env.getProperty("key.deserializer"));
    props.put("value.deserializer", env.getProperty("value.deserializer"));
    return props;
}//here as only value can be mapped to "key.deserializer" is there anyway to do this

in the main method

KafkaConsumer<String, String> _consumer = new KafkaConsumer<>(getProps());
consumers.add(_consumer);
_consumer.subscribe(new ArrayList<>(topicConsumer.keySet()));

Upvotes: 6

Views: 5706

Answers (3)

Michael B&#246;ckling
Michael B&#246;ckling

Reputation: 7922

If your events are arriving in different topics, there is built-in support for DelegatingByTopicDeserializer since 2.8.

See https://docs.spring.io/spring-kafka/reference/kafka/serdes.html#by-topic for details.

Upvotes: 0

Slohrsh
Slohrsh

Reputation: 795

Just write a generic Deserializer which delegates the topics to the matching deserializer.

public class GenericDeserializer extends JsonDeserializer<Object>
{
    public GenericDeserializer()
    {
    }

    @Override
    public Object deserialize(String topic, Headers headers, byte[] data)
    {
        switch (topic)
        {
        case KafkaTopics.TOPIC_ONE:
            TopicOneDeserializer topicOneDeserializer = new TopicOneDeserializer();
            topicOneDeserializer.addTrustedPackages("com.xyz");
            return topicOneDeserializer.deserialize(topic, headers, data);
        case KafkaTopics.TOPIC_TWO:
            TopicTwoDeserializer topicTwoDeserializer= new TopicTwoDeserializer();
            topicTwoDeserializer.addTrustedPackages("com.xyz");
            return topicTwoDeserializer.deserialize(topic, headers, data);
        }
        return super.deserialize(topic, data);
    }
}

Upvotes: 8

Matthias J. Sax
Matthias J. Sax

Reputation: 62350

You need to provide a single hybrid deserializer that is wraps both original deserializer. Internally, the new wrapping deserializer must be able to distinguish between both types of messages and forward the raw bytes to the correct deserializer that does the actual work.

If you cannot know in advance what type of message you have, you can also do a trial an error approach -- ie, hand it to one serializer by default, if this on fails (ie throws an exception) try the second one.

Upvotes: 3

Related Questions