Reputation: 835
I have been trying to trying to serilalize avro generic record and generate avro serialized data to send to kafka. The major goal is to not use confluent schema registry for storing schema but sending the schema along with the serialized data so it can be extracted from kafka topic and deserialized.
The below is the part of AvroSerializer for generating Avro data.
@Override
public byte[] serialize(String topic, T data) {
try {
byte[] result = null;
if (data != null) {
LOGGER.debug("data='{}'", data);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
BinaryEncoder binaryEncoder =
EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(data.getSchema());
datumWriter.setSchema(data.getSchema());
datumWriter.write(data, binaryEncoder);
binaryEncoder.flush();
byteArrayOutputStream.close();
result = byteArrayOutputStream.toByteArray();
}
return result;
} catch (IOException ex) {
throw new SerializationException(
"Can't serialize data='" + data + "' for topic='" + topic + "'", ex);
}
}
The serialized data present in kafka looks like this.
The AvroDeserializer part looks like this.
@Override
public T deserialize(String topic, byte[] data) {
GenericRecord person = null;
try {
T result = null;
if (data != null) {
LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data));
Schema schema = Schema.parse(schemaString);
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
result = (T) datumReader.read(null, decoder);
LOGGER.debug(result.getSchema().toString());
LOGGER.debug("deserialized data='{}'", result);
}
return result;
} catch (Exception ex) {
throw new SerializationException(
"Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
}
}
The producer is shown below
public class KafkaAvroProducerUtil {
public Future<RecordMetadata> produceTokafka(GenericRecord object) throws IOException {
Properties properties = new Properties();
// normal producer
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
properties.setProperty("acks", "all");
properties.setProperty("retries", "10");
// avro part
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", AvroSerializer.class.getName());
String topic = "avro";
Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(properties);
ProducerRecord<String, GenericRecord> producerRecord = new ProducerRecord<String, GenericRecord>(
topic, object
);
Future<RecordMetadata> data = producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println(metadata);
} else {
exception.printStackTrace();
}
}
});
producer.flush();
producer.close();
return data;
}
When I try to deserialize this it says schema is needed. The problem what I understand is that as you see the data in image above(snapshot of consumer running on cmd) the schema is not send along with it. How can I send schema along with the data so that I can deserialize with the schema send along with the data.
Upvotes: 0
Views: 3198
Reputation: 835
EDITS: I have approached the answers in two ways as per the suggestions of @OneCricketeer and @ChinHuang.
Both the approaches are explained below. But the answer for the header approach is shown below.
APPROACH 1: Sending schema along with data
In this approach I seraialized the Avro schema as string and along with a delimiter and send them to kafka topic adding the data along with it.
While deserializing after reading the data from kafka topic split the byte array as schema and data using the delimiter. Then I would convert schema bytes back to schema and then use that schema to deserialize the data.
Cons of the apporach: As @OneCricketeer said
APPROACH 2: Sending schema in the header
Here rather than sending schema along with the data , the schema is send in the header.
the methods in the Serializer class are shown below.
@Override
public byte[] serialize(String topic, T data) {
return null;
}
public byte[] serialize(String topic, Headers headers, T data) {
try {
byte[] result = null;
byte[] payload = null;
if (data != null) {
LOGGER.debug("data='{}'", data);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
BinaryEncoder binaryEncoder =
EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
byte[] schemaBytes = data.getSchema().toString().getBytes();
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(data.getSchema());
datumWriter.setSchema(data.getSchema());
datumWriter.write(data, binaryEncoder);
binaryEncoder.flush();
byteArrayOutputStream.close();
result = byteArrayOutputStream.toByteArray();
ByteArrayOutputStream outputStream2 = new ByteArrayOutputStream( );
outputStream2.write( result );
payload = outputStream2.toByteArray( );
headers.add("schema",schemaBytes);
}
LOGGER.info("headers added");
return payload;
} catch (IOException ex) {
throw new SerializationException(
"Can't serialize data='" + data + "' for topic='" + topic + "'", ex);
}
}
the Deserializer methods are shown below.
@Override
public T deserialize(String topic, byte[] data) {
return null
}
public T deserialize(String topic, Headers headers, byte[] data) {
try {
T result = null;
if (data != null) {
LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data));
Header header = headers.lastHeader("schema");
String schemaString2 = new String(header.value());
Schema schema = Schema.parse(schemaString2);
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
DataFileReader<GenericRecord> dataFileReader = null;
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
result = (T) datumReader.read(null, decoder);
LOGGER.debug(result.getSchema().toString());
LOGGER.debug("deserialized data='{}'", result);
}
return (T) result;
} catch (Exception ex) {
throw new SerializationException(
"Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
}
}
Upvotes: 3