Muhammad Sufyian
Muhammad Sufyian

Reputation: 78

Problems with deserialization when consuming message from Kafka topic

I am using Kafka Consumer API to build the consumer.The message structure is complex. To build the deserializers I have implemented the Deserializer class and provide necessary implementations.I am using Jackson API for deserializing. I am getting this error "Exception raisedorg.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition staging.datafeeds.PartnerHotel-0 at offset 19205124"

#POJO classes

    public class Change {
    private  Schema schema;
    private  Payload payload;
    //Getters and constructor
    }
    public class Details {
    private List<String> effectedAttributes;
    private List<PartnerHotel> cluster;
    //Getters and contructor
    }
    public class Field {
    private String type;
    private Boolean optional;
    private String field;
    //Getters and constructor
    }
    public class Fields {
    private String type;
    private List<Field> fields;
    private Boolean optional;
    private String name;
    //Getters and contructor
    }
    public class Geom{
    private int srid;
    private String wkb;
    //Getters and contructor
    }
    public class PartnerHotel{
    private int id;
    private int shopId;
    private String partnerHotelId;
    private boolean isOnline;
    private boolean isRemovedByUser;
    private int mappingPriority;
    private int hotelId;
    private String statusHotelId;
    private String name;
    private String street;
    private String zipCode;
    private String city;
    private String sourceCityId;
    private String state;
    private String stateAlpha2;
    private String country;
    private String alpha2;
    private String alpha3;
    private double latitude;
    private double longitude;
    private Geom geomPoint;
    private int countryIdShop;
    private int selectedGeoname;
    private String propertyType;
    private List<String> tags;
    private int stars;
    private String url;
    private int nrRatings;
    private double recommendation;
    private long dateHotelId;
    private long timeStamp;
    private long lastImport;
    //Getters and contructor
    }
    public class Payload {
    private PartnerHotel before;
    private PartnerHotel after;
    private Source source;
    private String op;
    private String ts_ms;
    //Getters and contructor
    }
    public class Schema {
    private String type;
    private Boolean optional;
    private String name;
    private List<Fields> fields;
    //Getters and contructor
    }
    public class Source {
    private String version;
    private String name;
    private String ts_usec;
    private String txId;
    private String lxn;
    private Boolean snapshot;
    private Object lastSnapshotRecord;
    //Getters and contructor
    }

#Deserializer

    public class ChangeDeserializer implements Deserializer<Change> {

    public ChangeDeserializer(){ }

    public void configure(Map<String, ?> map, boolean b) {}

    public Change deserialize(String topic, byte[] data) {
        if(data == null){
            return null;
        }
        try{
            ObjectMapper objectMapper = new ObjectMapper();
            Change change = objectMapper.readValue(data,Change.class);
            return change;
        }
        catch(IOException exception){
            throw new DeserializationException("Unable to deserialize               Change", exception);
        }}

    public void close() {}
    }

#Consumer
    public class KafkaAcnowledger {
        public static void main(String[] args){
        Properties props = new Properties();
        props.put("bootstrap.servers", "someUrl");
        props.put("group.id", "test131");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("max.poll.records",1);
        props.put("auto.offset.reset","earliest");
        props.put("key.deserializer",    "org.apache.kafka.common.serialization.LongDeserializer");
        props.put("value.deserializer",    "deserializer.ChangeDeserializer");
        KafkaConsumer<Long, Change> consumer = new KafkaConsumer(props);
        consumer.subscribe(Arrays.asList("staging.datafeeds.PartnerHotel"));
        while (true) {
            try{
            ConsumerRecords<Long, Change> records = consumer.poll(100);
            for (ConsumerRecord<Long, Change> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
        catch(Exception exception){
                System.out.println("Exception raised" + exception);
        }
        }


    }
    }

The poll() in the consumer looks fine , and the enter code hereexception I am getting a Serialization exception . I checked the consumer group via kafka-consumer-groups.sh , the group of this consumer is there in the list.Any direction is appreciated .

Structure of the message in the Kafka topic:

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int16","optional":false,"field":"shopId"},{"type":"string","optional":false,"field":"partnerHotelId"},{"type":"boolean","optional":false,"field":"isOnline"},{"type":"boolean","optional":false,"field":"isRemovedByUser"},{"type":"int32","optional":false,"field":"mappingPriority"},{"type":"int32","optional":true,"field":"hotelId"},{"type":"string","optional":true,"field":"statusHotelId"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"dateHotelId"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"timestamp"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"lastImport"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"street"},{"type":"string","optional":true,"field":"zipcode"},{"type":"string","optional":true,"field":"city"},{"type":"string","optional":true,"field":"sourceCityId"},{"type":"string","optional":true,"field":"state"},{"type":"string","optional":true,"field":"stateAlpha2"},{"type":"string","optional":true,"field":"country"},{"type":"string","optional":true,"field":"alpha2"},{"type":"string","optional":true,"field":"alpha3"},{"type":"double","optional":true,"field":"latitude"},{"type":"double","optional":true,"field":"longitude"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"geomPoint"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedGeonames"},{"type":"int32","optional":true,"field":"countryIdShop"},{"type":"int32","optional":true,"field":"selectedGeoname"},{"type":"string","optional":true,"field":"propertyType"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"tags"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"chains"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"creditCards"},{"type":"int32","optional":true,"field":"stars"},{"type":"string","optional":true,"field":"url"},{"type":"int32","optional":true,"field":"nrRatings"},{"type":"double","optional":true,"field":"recommendation"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedHotels"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedPartnerHotels"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"removedFromHotelIds"}],"optional":true,"name":"staging.datafeeds.PartnerHotel.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int16","optional":false,"field":"shopId"},{"type":"string","optional":false,"field":"partnerHotelId"},{"type":"boolean","optional":false,"field":"isOnline"},{"type":"boolean","optional":false,"field":"isRemovedByUser"},{"type":"int32","optional":false,"field":"mappingPriority"},{"type":"int32","optional":true,"field":"hotelId"},{"type":"string","optional":true,"field":"statusHotelId"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"dateHotelId"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"timestamp"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"lastImport"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"street"},{"type":"string","optional":true,"field":"zipcode"},{"type":"string","optional":true,"field":"city"},{"type":"string","optional":true,"field":"sourceCityId"},{"type":"string","optional":true,"field":"state"},{"type":"string","optional":true,"field":"stateAlpha2"},{"type":"string","optional":true,"field":"country"},{"type":"string","optional":true,"field":"alpha2"},{"type":"string","optional":true,"field":"alpha3"},{"type":"double","optional":true,"field":"latitude"},{"type":"double","optional":true,"field":"longitude"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"geomPoint"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedGeonames"},{"type":"int32","optional":true,"field":"countryIdShop"},{"type":"int32","optional":true,"field":"selectedGeoname"},{"type":"string","optional":true,"field":"propertyType"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"tags"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"chains"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"creditCards"},{"type":"int32","optional":true,"field":"stars"},{"type":"string","optional":true,"field":"url"},{"type":"int32","optional":true,"field":"nrRatings"},{"type":"double","optional":true,"field":"recommendation"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedHotels"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedPartnerHotels"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"removedFromHotelIds"}],"optional":true,"name":"staging.datafeeds.PartnerHotel.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"db"},{"type":"int64","optional":true,"field":"ts_usec"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"string","optional":true,"field":"schema"},{"type":"string","optional":true,"field":"table"},{"type":"boolean","optional":true,"default":false,"field":"snapshot"},{"type":"boolean","optional":true,"field":"last_snapshot_record"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"staging.datafeeds.PartnerHotel.Envelope"},"payload":{"before":null,"after":{"id":13893497,"shopId":135,"partnerHotelId":"6-42036","isOnline":false,"isRemovedByUser":false,"mappingPriority":0,"hotelId":null,"statusHotelId":"AUTO","dateHotelId":null,"timestamp":1529334013938327,"lastImport":1503491984188866,"name":"Ferienvermietung Wiedemann","street":"Chausseeberg 3","zipcode":"17429","city":"Mellenthin","sourceCityId":null,"state":null,"stateAlpha2":null,"country":"Deutschland","alpha2":"DE","alpha3":null,"latitude":53.920278,"longitude":14.013333,"geomPoint":{"wkb":"AQEAACDmEAAARuo9ldMGLEA5nWSry/VKQA==","srid":4326},"proposedGeonames":[2872064],"countryIdShop":83,"selectedGeoname":2872064,"propertyType":null,"tags":["77","36","33","34","38","43","41","123","26","29","1","7","6","70","9","1000","58","17","18","15","13","14","20","65","63","46","10","52"],"chains":[],"creditCards":[],"stars":null,"url":"http://www.buchen.travel/onepage-idealo-booking/index.php?room=6-42036","nrRatings":null,"recommendation":null,"proposedHotels":[],"proposedPartnerHotels":[],"removedFromHotelIds":[]},"source":{"version":"0.8.3.Final","name":"staging","db":"geo","ts_usec":1554391067119000,"txId":4757138,"lsn":1139303143104,"schema":"datafeeds","table":"PartnerHotel","snapshot":true,"last_snapshot_record":false},"op":"r","ts_ms":1554391067119}}

Upvotes: 0

Views: 6156

Answers (1)

Bartosz Wardziński
Bartosz Wardziński

Reputation: 6613

You POJO is not compatible with your message and jackson cannot parse it. At least there is lack of few fields, following error can be found.

Unrecognized field "timestamp" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "zipcode" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "proposedGeonames" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "chains" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "creditCards" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "proposedHotels" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "proposedPartnerHotels" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "removedFromHotelIds" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "db" (class  com.example.kafka.Change$Source), not marked as ignorable
Unrecognized field "lsn" (class  com.example.kafka.Change$Source), not marked as ignorable
Unrecognized field "schema" (class  com.example.kafka.Change$Source), not marked as ignorable
Unrecognized field "table" (class  com.example.kafka.Change$Source), not marked as ignorable
Unrecognized field "last_snapshot_record" (class  com.example.kafka.Change$Source), not marked as ignorable

To fix it you have to add those fields to your POJO or disable fail on unknown: objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);. More regarding jackson deserialization error can be found here: jackson Unrecognized field

Upvotes: 2

Related Questions