Reputation: 61
I am new to kafka and even serialization. until now I was required to handle kafka events of json format serialized using simple code. but now extra events are being added using Avro encoder. so now I want this single consumer to use StringDeserialzer if in json and for Avro its respective deserializer. but how can i map 2 Deserializer in the same properties file?
private Properties getProps(){
Properties props = new Properties();
props.put("group.id", env.getProperty("group.id"));
props.put("enable.auto.commit", env.getProperty("enable.auto.commit"));
props.put("key.deserializer", env.getProperty("key.deserializer"));
props.put("value.deserializer", env.getProperty("value.deserializer"));
return props;
}//here as only value can be mapped to "key.deserializer" is there anyway to do this
in the main method
KafkaConsumer<String, String> _consumer = new KafkaConsumer<>(getProps());
consumers.add(_consumer);
_consumer.subscribe(new ArrayList<>(topicConsumer.keySet()));
Upvotes: 6
Views: 5706
Reputation: 7922
If your events are arriving in different topics, there is built-in support for DelegatingByTopicDeserializer
since 2.8.
See https://docs.spring.io/spring-kafka/reference/kafka/serdes.html#by-topic for details.
Upvotes: 0
Reputation: 795
Just write a generic Deserializer which delegates the topics to the matching deserializer.
public class GenericDeserializer extends JsonDeserializer<Object>
{
public GenericDeserializer()
{
}
@Override
public Object deserialize(String topic, Headers headers, byte[] data)
{
switch (topic)
{
case KafkaTopics.TOPIC_ONE:
TopicOneDeserializer topicOneDeserializer = new TopicOneDeserializer();
topicOneDeserializer.addTrustedPackages("com.xyz");
return topicOneDeserializer.deserialize(topic, headers, data);
case KafkaTopics.TOPIC_TWO:
TopicTwoDeserializer topicTwoDeserializer= new TopicTwoDeserializer();
topicTwoDeserializer.addTrustedPackages("com.xyz");
return topicTwoDeserializer.deserialize(topic, headers, data);
}
return super.deserialize(topic, data);
}
}
Upvotes: 8
Reputation: 62350
You need to provide a single hybrid deserializer that is wraps both original deserializer. Internally, the new wrapping deserializer must be able to distinguish between both types of messages and forward the raw bytes to the correct deserializer that does the actual work.
If you cannot know in advance what type of message you have, you can also do a trial an error approach -- ie, hand it to one serializer by default, if this on fails (ie throws an exception) try the second one.
Upvotes: 3