Hemanth Kumar
Hemanth Kumar

Reputation: 83

How to find the schema id from schema registry used for avro records, when reading from kafka consumer

We use schema registry for storing schemas, and messages are serialised to avro and pushed to kafka topics.

Wanted to know, when reading data from consumer, how to find the schema id, for which the avro record is serialised. We require this schema id, to track changes whether a new column is added to the table. If new columns are added or deleted, a new schema id will be generated in schema registry, and how to get that id in consumer.

consumer = KafkaConsumer(bootstrap_servers = conf['BOOTSTRAP_SERVERS'],
                        auto_offset_reset = conf['AUTO_OFFSET'],
                        enable_auto_commit = conf['AUTO_COMMIT'],
                        auto_commit_interval_ms = conf['AUTO_COMMIT_INTERVAL']
                        )
consumer.subscribe(conf['KAFKA_TOPICS'])

for message in consumer:
    print(message.key)

From above code, message.key prints the key for that particular record, and how do we find the corresponding schema id which is used by consumer to deserialise the record.?

curl -X GET http://localhost:8081/subjects/helpkit_internal.helpkit_support.agents-value/versions/2

{"subject":"helpkit_internal.helpkit_support.agents-value","version":2,"id":33,"schema":"{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"helpkit_internal.helpkit_support.agents\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"user_id\"

Here from consumer, we wanted to get the id value "id":33

Please suggest on this.

Upvotes: 7

Views: 6088

Answers (1)

Giorgos Myrianthous
Giorgos Myrianthous

Reputation: 39790

What you can actually do, is to get the latest schema id for the given subject of the topic:

Using confluent-kafka-python

from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient

sr = CachedSchemaRegistryClient({
    'url': 'http://localhost:8081',
    'ssl.certificate.location': '/path/to/cert',  # optional
    'ssl.key.location': '/path/to/key'  # optional
})

value_schema = sr.get_latest_schema("helpkit_internal.helpkit_support.agents-value")[1]
key_schema= sr.get_latest_schema("helpkit_internal.helpkit_support.agents-key")[1]

Using SchemaRegistryClient

Getting schema by subject name

from schema_registry.client import SchemaRegistryClient


sr = SchemaRegistryClient('localhost:8081')
my_schema = sr.get_schema(subject='shelpkit_internal.helpkit_support.agents-value', version='latest')

Upvotes: 1

Related Questions