Reputation: 83
We use schema registry for storing schemas, and messages are serialised to avro and pushed to kafka topics.
Wanted to know, when reading data from consumer, how to find the schema id, for which the avro record is serialised. We require this schema id, to track changes whether a new column is added to the table. If new columns are added or deleted, a new schema id will be generated in schema registry, and how to get that id in consumer.
consumer = KafkaConsumer(bootstrap_servers = conf['BOOTSTRAP_SERVERS'],
auto_offset_reset = conf['AUTO_OFFSET'],
enable_auto_commit = conf['AUTO_COMMIT'],
auto_commit_interval_ms = conf['AUTO_COMMIT_INTERVAL']
)
consumer.subscribe(conf['KAFKA_TOPICS'])
for message in consumer:
print(message.key)
From above code, message.key prints the key for that particular record, and how do we find the corresponding schema id which is used by consumer to deserialise the record.?
curl -X GET http://localhost:8081/subjects/helpkit_internal.helpkit_support.agents-value/versions/2
{"subject":"helpkit_internal.helpkit_support.agents-value","version":2,"id":33,"schema":"{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"helpkit_internal.helpkit_support.agents\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"user_id\"
Here from consumer, we wanted to get the id value "id":33
Please suggest on this.
Upvotes: 7
Views: 6088
Reputation: 39790
What you can actually do, is to get the latest schema id for the given subject of the topic:
Using confluent-kafka-python
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
sr = CachedSchemaRegistryClient({
'url': 'http://localhost:8081',
'ssl.certificate.location': '/path/to/cert', # optional
'ssl.key.location': '/path/to/key' # optional
})
value_schema = sr.get_latest_schema("helpkit_internal.helpkit_support.agents-value")[1]
key_schema= sr.get_latest_schema("helpkit_internal.helpkit_support.agents-key")[1]
Using SchemaRegistryClient
Getting schema by subject name
from schema_registry.client import SchemaRegistryClient
sr = SchemaRegistryClient('localhost:8081')
my_schema = sr.get_schema(subject='shelpkit_internal.helpkit_support.agents-value', version='latest')
Upvotes: 1