Reputation: 3
I am starting to get into Apache Kafka (Confluent) and have some questions regarding the use of schemas. First, is my general understanding correct that a schema is used for validating the data? My understanding of schemas is that when the data is "produced", it checks if the Keys and Values fit the predefined concept and splits them accordingly.
My current technical setup is as follows:
Python:
from confluent_kafka import Producer
from config import conf
import json
# create producer
producer = Producer(conf)
producer.produce("datagen-topic", json.dumps({"product":"table","brand":"abc"}))
producer.flush()
in Confluent, i set up a json key schema for my topic:
{
"$schema": "http://json-schema.org/draft-04/schema#",
"properties": {
"brand": {
"type": "string"
},
"product": {
"type": "string"
}
},
"required": [
"product",
"brand"
],
"type": "object"
}
Now, when I produce the data, the message in Confluent contains only content in "Value". Key and Header are null:
{
"product": "table",
"brand": "abc"
}
Basically it doesn't make a difference if I have this schema set up or not, so I guess it's just not working as I set it up. Can you help me where my way of thinking is wrong or where my code is lacking input?
Upvotes: 0
Views: 3049
Reputation: 191728
The Confluent Python library Producer
class doesn't interact with the Registry in any way, so your message wouldn't be validated.
You'll want to use SerializingProducer
like in the example - https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/json_producer.py
If you want non-null keys and headers, you'll need to pass those on to the send method
Upvotes: 2