szend
szend

Reputation: 123

Kafka application cannot retrieve the right schema id from schema registry

Sometimes we are facing the following issue:

Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro unknown schema for id 16 Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema 16 not found io.confluent.rest.exceptions.RestNotFoundException: Schema 16 not foundio.confluent.rest.exceptions.RestNotFoundException: Schema 16 not found

But when I check the version with curl I cannot find schema 16, only 1.

In our application there is a Producer and Consumer part (we don't use Kafka Streams).

I'm wondering why the application tries to find schema 16 and where does it find this number? Is it somewhere stored in the application cache or some internal logs on the Kafka Broker/Schema Registry?

Our current solution for such a case that we delete the Kafka logs (/tmp/kafka-logs, /tmp/confluent,/tmp/zookeeper) and recreate the internal _schemas topic, which is kind of brute force solution.

What would be the right approach to analyze and solve the problem?

Upvotes: 0

Views: 1934

Answers (2)

OneCricketeer
OneCricketeer

Reputation: 191743

It is possible that some other producer has not used Avro to produce to your topic, so the deserializer just pulls off the first few bytes and uses whatever is there for the ID. Or, you've mistakenly used Avro for both key and value deserializers, where only value is Avro, but the producer's keys are Strings, for example.

current solution for such a case that we delete the Kafka logs (/tmp/kafka-logs, /tmp/confluent,/tmp/zookeeper)

You should never store persistent data in /tmp. You could start with kafka-topics --delete _schemas instead.

Upvotes: 0

Svend
Svend

Reputation: 7180

I'm wondering why the application tries to find schema 16 and where does it find this number? Is it somewhere stored in the application cache or some internal logs on the Kafka Broker/Schema Registry?

When using the Confluent serializers and deserializers, the schema id is stored by the producer as the first 4 bytes stored in the Kafka value.

Assuming you have kcat and hexdump, you can inspect this schema id as follows (assuming the record is stored at offset 40 of partition 2 of that topic):

kcat \
   -b kafka.broker.host:9092 \
   -C \
   -o 40 -p 2 -c 1  \
   -t the_topic_name | hexdump -C -s1 -n4

Should yield something like

00000001  00 00 01 ab                                       |....|
00000005

00 00 01 ab is the hexadecimal encoding of the schema id of that record (it will be different in your case of course), which can be converted in decimal as follows:

echo $(( 16#000001ab ))

which yields

171

(I guess that would be 16 in your case)

And you can then fetch that schema by ID directly from the schema registry as follows:

curl \
  -X GET \
  http://the.schema.registry.host/schemas/ids/16

If the producer sets that value to 16, the schema id 16 must exist in the schema registry the producer is connected to, at least at the moment when the record is sent to kafka.

If that schema id is not present at some point later, then somehow something happened to the registry between the moment the record was produced and when it is consumed.

we delete the Kafka logs (/tmp/kafka-logs, /tmp/confluent,/tmp/zookeeper) and recreate the internal _schemas topic, which is kind of brute force solution.

_schemas is where the associations between schema id and schema are stored. If somehow during experimentation that topic got deleted but some old record remained in Kafka (I understand that's not exactly what you're describing, though I'm exploring options here), then we could end up with records in kafka that contain reference to no longer existing schema id in the registry. Under normal circumstances you should never have to touch that topic.

Other cases when such schema id can go missing include:

  • the producer and consumer not connected to the same schema registry cluster
  • the content of that topic is backed-up in some external storage (say, S3), a new empty Kafka is created and the records are now brought back to this new Kafka cluster. If so, the id in the schema registry would no longer match.
  • the value of kafkastore.topic of the schema registry has been modified since the record has been produced.

Upvotes: 1

Related Questions