Reputation: 10580
I am producing a Kafka record, using an Avro schema that is registered in Schema Registry.
The subject is registered, because when I hit http://localhost:8081/subjects/player-value/versions/2
I get:
{
subject: "player-value",
version: 2,
id: 21,
schema: "{"type":"record","name":"player_value","namespace":"com.mycorp.mynamespace","doc":"Sample schema to help you get started.","fields":[{"name":"id","type":"int","doc":"The int type is a 32-bit signed integer."},{"name":"first_name","type":"string","doc":"The string is a unicode character sequence."}]}"
}
I am downloading the schema, and then use GenericRecord to produce to a topic with that schema.
I have set the subject value strategy to RecordNamingStragegy.
I create a GenericRecord like this:
Schema schema = new Schema.Parser().parse(subject.schema);
System.out.println(subject.schema);
return new GenericData.Record(schema);
record.put("id", 1);
record.put("first_name", "foobar");
where subject.schema is:
{"type":"record","name":"player_value","namespace":"com.mycorp.mynamespace","doc":"Sample schema to help you get started.","fields":[{"name":"id","type":"int","doc":"The int type is a 32-bit signed integer."},{"name":"first_name","type":"string","doc":"The string is a unicode character sequence."}]}
However, when I produce, I get this error:
SerializationException: Error retrieving Avro schema: {"type":"record","name":"player_value","namespace":"com.mycorp.mynamespace","doc":"Sample schema to help you get started.","fields":[{"name":"id","type":"int","doc":"The int type is a 32-bit signed integer."},{"name":"first_name","type":"string","doc":"The string is a unicode character sequence."}]}
This is my full code (which is not needed for you to read all):
public static void main(String[] args) throws Exception {
schemaRegistryUtil.downloadSchema("player-value", 2)
.thenApply(subject -> {
Schema schema = new Schema.Parser().parse(subject.schema);
System.out.println(subject.schema);
return new GenericData.Record(schema);
})
.thenApply(record -> {
record.put("id", 1);
record.put("first_name", "Totti");
return record;
})
.thenApply(record -> producer.produce("some-key", record, TOPIC))
.whenCompleteAsync((metadata, throwable) -> {
if (throwable != null) {
System.out.println(String.format("Error happened %s", throwable.getMessage()));
} else {
System.out.println("all good man");
}
});
}
Funny enough, If I remove
properties.setProperty(AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, RecordNameStrategy.class.getName());
It works well!
Upvotes: 0
Views: 1116
Reputation: 9357
There are different naming strategies for the schema subjects in the schema registry. This link provides a description of the strategies, the following is an extraction from that page..
Any implementation of
io.confluent.kafka.serializers.subject.SubjectNameStrategy
can be specified. By default,<topic>-value
is used as subject.
The default is TopicNameStrategy
which comprises of the topic_name
-key or topic-name
-value depending on the configuration isKey supplied to the serializer. This allows the topic to have a single subject.
The other naming strategy is RecordNameStrategy
that names on the basis of the avro record. From the docs..
For any Avro record type that is published to Kafka topic
<topicName>
, registers the schema in the registry under the subject name<topicName>-<recordName>
, where<recordName>
is the fully-qualified Avro record name.This strategy allows a topic to contain a mixture of different record types, since no intra-topic compatibility checking is performed
Moreover, different topics may contain mutually incompatible versions of the same record name, since the compatibility check is scoped to a particular record name within a particular topic.
Sometimes, we get exception even if the schema registry is inaccessible, though it seems not to be the case for you.
Upvotes: 0