Reputation: 333
I am currently looking into solutions for unexpected behaviour when dealing with particular AVRO schema evolution scenarios when using Java and doing a deepcopy in a consumer to parse the GenericRecord class into a specific class which was generated from the AVRO schema.
In order to explain what is happening, I will use a simplified schema example:
{
"name":"SimpleEvent",
"type":"record",
"namespace":"com.simple.schemas",
"fields":[
{
"name":"firstfield",
"type":"string",
"default":""
},
{
"name":"secondfield",
"type":"string",
"default":""
},
{
"name":"thirdfield",
"type":"string",
"default":""
}
]
}
It's just a simple schema with three string fields, all optional since they have default values. Assuming at some point I want to add another string field, and remove one field as well since it's no longer needed, you end up with this:
{
"name":"SimpleEvent",
"type":"record",
"namespace":"com.simple.schemas",
"fields":[
{
"name":"firstfield",
"type":"string",
"default":""
},
{
"name":"secondfield",
"type":"string",
"default":""
},
{
"name":"newfield",
"type":"string",
"default":""
}
]
}
This should not be breaking changes according to schema evolution rules. However, when the producer starts generating events with the newer schema, something odd happens in downstream consumers.
It turns out the generated Java classes (I used the Gradle avro plug-in to generate the class, but the maven plug-in and avro tools command line code generation yield the same output) only look at field order, and don't map fields based on their name.
Meaning that the value of field "newfield" gets mapped to "thirdfield" by downstream consumers who use the older version of the schema to read the data.
I've found some work where a manual mapping is performed based on name, however, that doesn't work for nested objects.
Through some local experiments I also found another approach that does resolve the schema differences properly works:
Schema readerSchema = SimpleEvent.getClassSchema();
Schema writerSchema = request.getSchema();
if (readerSchema.equals(writerSchema)){
return (SimpleEvent)SpecificData.get().deepCopy(writerSchema, request);
}
DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(writerSchema);
BinaryEncoder encoder = null;
ByteArrayOutputStream stream = new ByteArrayOutputStream();
encoder = EncoderFactory.get().binaryEncoder(stream, encoder);
writer.write(request, encoder);
encoder.flush();
byte[] recordBytes = stream.toByteArray();
Decoder decoder = DecoderFactory.get().binaryDecoder(recordBytes, null);
SpecificDatumReader<SimpleEvent> specificDatumReader = new SpecificDatumReader(writerSchema, readerSchema);
SimpleEvent result = specificDatumReader.read(null, decoder);
return result;
However, this seems like a rather wasteful/inelegant approach because you first have to convert the GenericRecord into a byteArray and then read it again using the SpecificDatumReader.
The difference between deepcopy and the datumreader classes is that the datumReader classes seem to accommodate for scenario's where the writer schema is different from the reader schema.
I feel there should/can be a better, more elegant way to handle this. I would really appreciate any help/tips in getting there.
Thanks in advance :)
Oskar
Upvotes: 1
Views: 2564
Reputation: 333
After more digging and looking at the KafkaAvroDeserializer we were previously using in our listeners I noticed the AbstractKafkaAvroDeserializer had a function to deserialize where you could pass in the reader schema. It looked to good to be true, but it works!
package com.oskar.generic.consumer.demo;
import com.simple.schemas;
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
public class SimpleEventDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer<Object> {
private boolean isKey;
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
this.isKey = isKey;
configure(new KafkaAvroDeserializerConfig(configs));
}
@Override
public Object deserialize(String s, byte[] bytes) {
return super.deserialize(bytes, SimpleEvent.getClassSchema());
}
@Override
public void close() {
}
}
Which is then used in the consumer factory like this:
@Bean
public ConsumerFactory<String, GenericRecord> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29095");
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "one");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SimpleEventDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
And the listener code itself looks like this:
@KafkaListener(topics = "my-topic")
public GenericRecord listen(@Payload GenericRecord request, @Headers MessageHeaders headers) throws IOException {
SimpleEvent event = (SimpleEvent) SpecificData.get().deepCopy(request.getSchema(), request);
return request;
}
Upvotes: 1