Tim
Tim

Reputation: 1471

Spring kafka JsonSerializer to payload with map with null key throws exception

I have created a rest endpoint to push message to kafka, the details as follows

package com.learn.kafka.model;

import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.Data;

import java.util.Map;

@Data
@JsonTypeInfo(
        use = JsonTypeInfo.Id.NAME,
        property = "type")
public class SpecialData {

    Map<String, Object> messageInfo;
}
package com.learn.kafka.service;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
public class ConsumerService {

    @KafkaListener(topics={"#{'${spring.kafka.topic}'}"},groupId="#{'${spring.kafka.consumer.group-id}'}")
    public void consumeMessage(String message){
        log.info("Consumed message - {}",message);
    }
}
package com.learn.kafka.service;

import java.text.MessageFormat;

import com.learn.kafka.model.SpecialData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class ProducerService{

    @Value("${spring.kafka.topic:demo-topic}")
    String topicName;
    @Autowired
    KafkaTemplate<String,Object> kafkaTemplate;

     public String sendMessage(SpecialData messageModel){
        log.info("Sending message from producer - {}",messageModel);
        Message message = constructMessage(messageModel);
        kafkaTemplate.send(message);
        return MessageFormat.format("Message Sent from Producer - {0}",message);
    }

    private Message constructMessage(SpecialData messageModel) {

        return MessageBuilder.withPayload(messageModel)
                .setHeader(KafkaHeaders.TOPIC,topicName)
                .setHeader("reason","for-Local-validation")
                .build();
    }
}
package com.learn.kafka.controller;
import com.learn.kafka.model.SpecialData;
import com.learn.kafka.service.ProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.Map;

@RestController
@RequestMapping("/api")
@Slf4j
public class MessageController {

    @Autowired
    private ProducerService producerService;

    @GetMapping("/send")
    public void sendMessage(){
        SpecialData messageData = new SpecialData();
        Map<String,Object> input = new HashMap<>();
        input.put(null,"the key is null explicitly");
        input.put("1","the key is one non-null");

        messageData.setMessageInfo(input);

        producerService.sendMessage(messageData);
    }
}
package com.learn.kafka;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import com.fasterxml.jackson.databind.util.StdDateFormat;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import java.io.IOException;
import java.util.Map;

public class CustomSerializer implements Serializer<Object> {
    private static final ObjectMapper MAPPER = new ObjectMapper();

    static {
        MAPPER.findAndRegisterModules();
        MAPPER.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        MAPPER.setDateFormat(new StdDateFormat().withColonInTimeZone(true));
        MAPPER.getSerializerProvider().setNullKeySerializer(new NullKeyKeySerializer());
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public byte[] serialize(String topic, Object data) {
        try {
            if (data == null){
                System.out.println("Null received at serializing");
                return null;
            }
            System.out.println("Serializing...");
            return MAPPER.writeValueAsBytes(data);
        } catch (Exception e) {
            e.printStackTrace();
            throw new SerializationException("Error when serializing MessageDto to byte[]");
        }
    }

    @Override
    public void close() {
    }

    static class NullKeyKeySerializer extends StdSerializer<Object> {

        public NullKeyKeySerializer() {
            this(null);
        }

        public NullKeyKeySerializer(Class<Object> t) {
            super(t);
        }
        @Override
        public void serialize(Object obj, JsonGenerator gen, SerializerProvider provider) throws IOException {
            gen.writeFieldName("null");
        }

    }
}

spring:
  kafka:
     topic: input-topic
     consumer:
        bootstrap-servers: localhost:9092
        group-id: input-group-id
        auto-offset-reset: earliest
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
     producer:
         bootstrap-servers: localhost:9092
         key-serializer: org.apache.kafka.common.serialization.StringSerializer
         value-serializer: com.learn.kafka.CustomSerializer
         properties:
             spring.json.add.type.headers: false

Reference for null key serializer implementation

Upvotes: 1

Views: 1938

Answers (2)

Tim
Tim

Reputation: 1471

Adding the code details to @Gary Russell answer above.

  • In my scenario I am ok with only serialization part of it. No issues since I don't want to de-serialize the payload and use specific data.
package com.learn.kafka;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import com.fasterxml.jackson.databind.util.StdDateFormat;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.io.IOException;

public class CustomJsonSerializer extends JsonSerializer<Object> {

    public CustomJsonSerializer() {
        super(customizedObjectMapper());
    }

    private static ObjectMapper customizedObjectMapper() {
        ObjectMapper objMapper= new ObjectMapper();
        //if the pay load include timestamps we need to use modules
        objMapper.findAndRegisterModules();

 
        objMapper.getSerializerProvider().setNullKeySerializer(new NullKeySerializer());
        return objMapper;
    }

    static class NullKeySerializer extends StdSerializer<Object> {
        public NullKeySerializer() {
            this(null);
        }

        public NullKeySerializer(Class<Object> t) {
            super(t);
        }

        @Override
        public void serialize(Object nullKey, JsonGenerator generator, SerializerProvider unused)
                throws IOException {
            generator.writeFieldName("null");
        }
    }
}
  • Use the class in application.yaml in value-serializer

spring:
    kafka:
      bootstrap-servers: domain:9092
      producer:
         value-serializer: com.learn.kafka.CustomJsonSerialier

Upvotes: 1

Gary Russell
Gary Russell

Reputation: 174574

You don't need to extend the deserializer, it already has a constructor that takes a custom ObjectMapper; simply create one in Java and add it to the consumer factory using setValueDeserializer(). (There are similar setters for serializers on the producer factory).

However, extending the class will allow you to configure it in the yaml, if that is what you prefer.

Upvotes: 1

Related Questions