Vimal Dhaduk
Vimal Dhaduk

Reputation: 1034

How to create Custom serializer in kafka?

There is only few serializer available like,

org.apache.kafka.common.serialization.StringSerializer

How can we create our own custom serializer ?

Upvotes: 9

Views: 28286

Answers (4)

hcallejas
hcallejas

Reputation: 116

there is an easier way to do it, basically if you are casting your custom class to bytes in your custom serializer, then you are rebuilding the wheel. kafka already works with bytes.

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.utils.Bytes;

Bytes bytes = new Bytes(objectMapper.writeValueAsBytes(<customClass>));
kafkaTemplate.send("topic",bytes);

next in your Producter and Consumer configuration

@Bean
public ProducerFactory<String,String>(){
Map<String, Object> configProps = new HashMap<>();
    configProps.put(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "<kafka-server>");
    configProps.put(
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class);
    configProps.put(
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            BytesSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}


@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "host");
    props.put(
            ConsumerConfig.GROUP_ID_CONFIG,
            "group-id");
    props.put(
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
    props.put(
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            BytesDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

finally

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.utils.Bytes;


@KafkaListener(topics = "your.topic")
public void getInfoPersona(Bytes message) throws IOException {
    <your-custom-class> customClass = 
    objectMapper.readValue(message.get(), <your-custom-class>.class);
}

Upvotes: 0

Vladimir
Vladimir

Reputation: 555

No words, only code

  1. Some object, which you sent to Kafka

    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import lombok.ToString;
    
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @ToString
    public class TestDto {
    
        private String name;
        private String version;
    
    }
    
  2. Create Serializer, which will be used by Producer

    @Slf4j
    public class KafkaValueSerializer implements Serializer<TestDto> {
    
        private ObjectMapper objectMapper = new ObjectMapper();
    
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
        }
    
        @Override
        public byte[] serialize(String topic, TestDto data) {
            try {
                return objectMapper.writeValueAsBytes(data);
            } catch (JsonProcessingException e) {
                log.error("Unable to serialize object {}", data, e);
                return null;
            }
        }
    
        @Override
        public void close() {
        }
    }
    
  3. Of couser, don't foget about Deserialiser for Consumer

    @Slf4j
    public class KafkaValueDeserializer implements Deserializer<TestDto> {
    
        private ObjectMapper objectMapper = new ObjectMapper();
    
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
        }
    
        @Override
        public TestDto deserialize(String topic, byte[] data) {
            try {
                return objectMapper.readValue(new String(data, "UTF-8"), TestDto.class);
            } catch (Exception e) {
                log.error("Unable to deserialize message {}", data, e);
                return null;
            }
        }
    
        @Override
        public void close() {
        }
    }
    
  4. Last moment, add serializer/deserializer to application.yml

    spring:
        kafka:
          bootstrap-servers:  192.168.192.168:9092
          producer:
              value-serializer: com.package.service.kafka.KafkaValueSerializer
          consumer:
              group-id: groupId
              value-deserializer: com.package.service.kafka.KafkaValueDeserializer
    

That's all. It's not necessary any configuration file or dancing with a tamboirine :)

  1. Send

    KafkaTemplate<String, TestDto> kafkaTemplate;
    
    TestDto test = new TestDto("test name", "test-version");
    kafkaTemplate.send(topic, testDto);
    
  2. Listen

    @KafkaListener(topics = "${ktp-agent.kafka.request-topic}", groupId = "${spring.kafka.consumer.group-id}")
    public void listen(TestDto message) {
    
        log.info("Received message '{}' from Kafka.", message.toString());
    }
    

Upvotes: 12

amethystic
amethystic

Reputation: 7079

You must create your own serializer which implements the interface Serializer (org.apache.kafka.common.serialization.Serializer) and then set the producer option key.serializer / value.serializer to it.

Upvotes: -2

Luciano Afranllie
Luciano Afranllie

Reputation: 4253

Here you have an example to use your own serializer/deserializer for the Kafka message value. For Kafka message key is the same thing.

We want to send a serialized version of MyMessage as Kafka value and deserialize it again into a MyMessage object at consumer side.

Serializing MyMessage in producer side.

You should create a serializer class that implements org.apache.kafka.common.serialization.Serializer

serialize() method do the work, receiving your object and returning a serialized version as bytes array.

public class MyValueSerializer implements Serializer<MyMessage>
{
    private boolean isKey;

    @Override
    public void configure(Map<String, ?> configs, boolean isKey)
    {
        this.isKey = isKey;
    }

    @Override
    public byte[] serialize(String topic, MyMessage message)
    {
        if (message == null) {
            return null;
        }

        try {

            (serialize your MyMessage object into bytes)

            return bytes;

        } catch (IOException | RuntimeException e) {
            throw new SerializationException("Error serializing value", e);
        }
    }

    @Override
    public void close()
    {

    }
}

final IntegerSerializer keySerializer = new IntegerSerializer();
final MyValueSerializer myValueSerializer = new MyValueSerializer();
final KafkaProducer<Integer, MyMessage> producer = new KafkaProducer<>(props, keySerializer, myValueSerializer);

int messageNo = 1;
int kafkaKey = messageNo;
MyMessage kafkaValue = new MyMessage();
ProducerRecord producerRecord = new ProducerRecord<>(topic, kafkaKey, kafkaValue);
producer.send(producerRecord, new DemoCallBack(logTag, startTime, messageNo, strValue));

Deserializing MyMessage in consumer side.

You should create a deserializer class that implements org.apache.kafka.common.serialization.Deserializer

deserialize() method do the work, receiving serialized value as bytes array and returning your object.

public class MyValueDeserializer implements Deserializer<MyMessage>
{
    private boolean isKey;

    @Override
    public void configure(Map<String, ?> configs, boolean isKey)
    {
        this.isKey = isKey;
    }

    @Override
    public MyMessage deserialize(String s, byte[] value)
    {
        if (value == null) {
            return null;
        }

        try {

            (deserialize value into your MyMessage object)

            MyMessage message = new MyMessage();
            return message;

        } catch (IOException | RuntimeException e) {
            throw new SerializationException("Error deserializing value", e);
        }
    }

    @Override
    public void close()
    {

    }
}

Then use it like this:

final IntegerDeserializer keyDeserializer = new IntegerDeserializer();
final MyValueDeserializer myValueDeserializer = new MyValueDeserializer();
final KafkaConsumer<Integer, MyMessage> consumer = new KafkaConsumer<>(props, keyDeserializer, myValueDeserializer);

ConsumerRecords<Integer, MyMessage> records = consumer.poll(1000);
for (ConsumerRecord<Integer, MyMessage> record : records) {

    int kafkaKey = record.key();
    MyMessage kafkaValue = record.value();

    ...
}

Upvotes: 17

Related Questions