Reputation: 3870
Custom Class
Person
class Person
{
private Integer id;
private String name;
//getters and setters
}
Kafka Flink Connector
TypeInformation<Person> info = TypeInformation.of(Person.class);
TypeInformationSerializationSchema schema = new TypeInformationSerializationSchema(info, new ExecutionConfig());
DataStream<Person> input = env.addSource( new FlinkKafkaConsumer08<>("persons", schema , getKafkaProperties()));
Now if I send the below json
{ "id" : 1, "name": Synd }
through Kafka Console Producer, the flink code throws null pointer exception
But if I use SimpleStringSchema
instead of CustomSchema as defined before, the stream is getting printed.
What is wrong in the above setup
Upvotes: 0
Views: 840
Reputation: 3870
Answer for who have the same question
Custom Serializer
class PersonSchema implements DeserializationSchema<Person>{
private ObjectMapper mapper = new ObjectMapper(); //com.fasterxml.jackson.databind.ObjectMapper;
@Override
public Person deserialize(byte[] bytes) throws IOException {
return mapper.readValue( bytes, Person.class );
}
@Override
public boolean isEndOfStream(Person person) {
return false;
}
@Override
public TypeInformation<Person> getProducedType() {
return TypeInformation.of(new TypeHint<Person>(){});
}
}
Using the schema
DataStream<Person> input = env.addSource( new FlinkKafkaConsumer08<>("persons", new PersonSchema() , getKafkaProperties()));
Upvotes: 0
Reputation: 13346
The TypeInformationSerializationSchema
is a de-/serialization schema which uses Flink's serialization stack and, thus, also its serializer. Therefore, when using this SerializationSchema
Flink expects that the data has been serialized with Flink's serializer for the Person
type.
Given the excerpt of the Person
class, Flink will most likely use its PojoTypeSerializer
. Feeding JSON input data won't be understood by this serializer.
If you want to use JSON as the input format, then you have to define your own DeserializationSchema
which can parse JSON into Person
.
Upvotes: 1