boizor
boizor

Reputation: 23

Avro - Deserialize POJOs

I'm new to Avro and Kafka spent last days to send serialized data on a Kafka topic... unsuccessfully.

Let me explain what I'm trying to achieve:

On producer side, I receive data via SOAP and send content on a Kafka topic. I'm using CXF to generate POJOs from WSDL, and I've written corresponding schemas. What i'm trying to do is to serialize objects unmarshaled by CXF and send them on my Kafka topic.

In most examples found on the web, Avro records are generated using a known schema (or data type), but in this case I don't know which schema will be used when serializing data. So I get message type dynamically (via CXF interceptor) and serialize this way:

// get unmarshaled POJO
MessageContentsList objs = MessageContentsList.getContentsList(message);
Object obj = objs.get(0);

EncoderFactory factory = EncoderFactory.get();
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder encoder = factory.directBinaryEncoder(out, null);

// getting schema from class name (first approach)
String scName = obj.getClass().getSimpleName();
InputStream avroRes = this.getClass().getClassLoader().getResourceAsStream(scName);
Schema schema = new Schema.Parser().parse(avroRes);

ReflectDatumWriter<Object> writer = new ReflectDatumWriter<Object>(schema);
writer.write(obj, encoder);
encoder.flush();
out.close();

KeyedMessage< String, byte[]> kMessage = new KeyedMessage<String, byte[]>("mytopic", out.toByteArray());
producer.send(kMessage);

This way I can send data on my topic but I'm not able to get schema from incoming message.

Is there a way to:

What are the "best" practices to send Avro records on Kafka topics when data types are unknown?

Maybe I missed something when reading Avro documentation and don't use it as expected.

Thanks for your help...

Upvotes: 1

Views: 4772

Answers (1)

Chin Huang
Chin Huang

Reputation: 13790

Messages sent to the Kafka topic should encode both the schema and the Avro record. If sending the schema in each message is too much overhead, then send an identifier for the schema instead. A message consumer can use the identifier to retrieve the full schema definition from a schema registry. For example, this code to serialize a Kafka message writes the schema identifier in the first bytes of the message:

ByteArrayOutputStream out = new ByteArrayOutputStream();

schema = getSchema(object);
int id = schemaRegistry.register(subject, schema);
out.write(MAGIC_BYTE);
out.write(ByteBuffer.allocate(idSize).putInt(id).array());

BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null);
DatumWriter<Object> writer;
if (object instanceof SpecificRecord) {
  writer = new SpecificDatumWriter<Object>(schema);
} else {
  writer = new GenericDatumWriter<Object>(schema);
}
writer.write(object, encoder);
encoder.flush();

byte[] bytes = out.toByteArray();
out.close();
return bytes;

Upvotes: 1

Related Questions