Marco Dinatsoli
Marco Dinatsoli

Reputation: 10580

Kafka avrò can't locate subject

I am producing a Kafka record, using an Avro schema that is registered in Schema Registry.

The subject is registered, because when I hit http://localhost:8081/subjects/player-value/versions/2

I get:

{
subject: "player-value",
version: 2,
id: 21,
schema: "{"type":"record","name":"player_value","namespace":"com.mycorp.mynamespace","doc":"Sample schema to help you get started.","fields":[{"name":"id","type":"int","doc":"The int type is a 32-bit signed integer."},{"name":"first_name","type":"string","doc":"The string is a unicode character sequence."}]}"
}

I am downloading the schema, and then use GenericRecord to produce to a topic with that schema.

I have set the subject value strategy to RecordNamingStragegy.

I create a GenericRecord like this:

Schema schema = new Schema.Parser().parse(subject.schema);
                    System.out.println(subject.schema);
                    return new GenericData.Record(schema);
record.put("id", 1);
                    record.put("first_name", "foobar");

where subject.schema is:

{"type":"record","name":"player_value","namespace":"com.mycorp.mynamespace","doc":"Sample schema to help you get started.","fields":[{"name":"id","type":"int","doc":"The int type is a 32-bit signed integer."},{"name":"first_name","type":"string","doc":"The string is a unicode character sequence."}]}

However, when I produce, I get this error:

SerializationException: Error retrieving Avro schema: {"type":"record","name":"player_value","namespace":"com.mycorp.mynamespace","doc":"Sample schema to help you get started.","fields":[{"name":"id","type":"int","doc":"The int type is a 32-bit signed integer."},{"name":"first_name","type":"string","doc":"The string is a unicode character sequence."}]}

This is my full code (which is not needed for you to read all):

public static void main(String[] args) throws Exception {
        schemaRegistryUtil.downloadSchema("player-value", 2)
                .thenApply(subject -> {
                    Schema schema = new Schema.Parser().parse(subject.schema);
                    System.out.println(subject.schema);
                    return new GenericData.Record(schema);
                })
                .thenApply(record -> {
                    record.put("id", 1);
                    record.put("first_name", "Totti");
                    return record;
                })
                .thenApply(record -> producer.produce("some-key", record, TOPIC))
                .whenCompleteAsync((metadata, throwable) -> {
                    if (throwable != null) {
                        System.out.println(String.format("Error happened %s", throwable.getMessage()));
                    } else {
                        System.out.println("all good man");
                    }
                });


    }

Update

Funny enough, If I remove

properties.setProperty(AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, RecordNameStrategy.class.getName());

It works well!

Upvotes: 0

Views: 1116

Answers (1)

JavaTechnical
JavaTechnical

Reputation: 9357

There are different naming strategies for the schema subjects in the schema registry. This link provides a description of the strategies, the following is an extraction from that page..

Any implementation of io.confluent.kafka.serializers.subject.SubjectNameStrategy can be specified. By default, <topic>-value is used as subject.

The default is TopicNameStrategy which comprises of the topic_name-key or topic-name-value depending on the configuration isKey supplied to the serializer. This allows the topic to have a single subject.

The other naming strategy is RecordNameStrategy that names on the basis of the avro record. From the docs..

For any Avro record type that is published to Kafka topic <topicName>, registers the schema in the registry under the subject name <topicName>-<recordName>, where <recordName> is the fully-qualified Avro record name.

This strategy allows a topic to contain a mixture of different record types, since no intra-topic compatibility checking is performed

Moreover, different topics may contain mutually incompatible versions of the same record name, since the compatibility check is scoped to a particular record name within a particular topic.

Sometimes, we get exception even if the schema registry is inaccessible, though it seems not to be the case for you.

Upvotes: 0

Related Questions