madhairsilence
madhairsilence

Reputation: 3870

Flink Kafka - Custom Class Data is always null

Custom Class

Person

class Person
{
  private Integer id;
  private String name; 
 //getters and setters
}

Kafka Flink Connector

TypeInformation<Person> info = TypeInformation.of(Person.class);
TypeInformationSerializationSchema schema = new TypeInformationSerializationSchema(info, new ExecutionConfig());
DataStream<Person> input = env.addSource( new FlinkKafkaConsumer08<>("persons", schema , getKafkaProperties()));

Now if I send the below json

{ "id" : 1, "name": Synd }

through Kafka Console Producer, the flink code throws null pointer exception But if I use SimpleStringSchema instead of CustomSchema as defined before, the stream is getting printed.

What is wrong in the above setup

Upvotes: 0

Views: 840

Answers (2)

madhairsilence
madhairsilence

Reputation: 3870

Answer for who have the same question

Custom Serializer

class PersonSchema implements DeserializationSchema<Person>{

    private ObjectMapper mapper = new ObjectMapper(); //com.fasterxml.jackson.databind.ObjectMapper;

    @Override
    public Person deserialize(byte[] bytes) throws IOException {
        return mapper.readValue( bytes, Person.class );
    }

    @Override
    public boolean isEndOfStream(Person person) {
        return false;
    }

    @Override
    public TypeInformation<Person> getProducedType() {
        return TypeInformation.of(new TypeHint<Person>(){});
    }
}

Using the schema

DataStream<Person> input = env.addSource( new FlinkKafkaConsumer08<>("persons", new PersonSchema() , getKafkaProperties()));

Upvotes: 0

Till Rohrmann
Till Rohrmann

Reputation: 13346

The TypeInformationSerializationSchema is a de-/serialization schema which uses Flink's serialization stack and, thus, also its serializer. Therefore, when using this SerializationSchema Flink expects that the data has been serialized with Flink's serializer for the Person type.

Given the excerpt of the Person class, Flink will most likely use its PojoTypeSerializer. Feeding JSON input data won't be understood by this serializer.

If you want to use JSON as the input format, then you have to define your own DeserializationSchema which can parse JSON into Person.

Upvotes: 1

Related Questions