Reputation: 99
I have set up a Kafka cluster with a schema-registry using docker containers.
Then I built a Faust streaming app that sends messages to a Kafka topic.
The messages are sent to the Kafka topic and stored as expected. But I cannot see the defined Avro schema in the Kafka schema-registry (using the confluent control-center).
Why is the avro schema not showing up in the control-center? What am I missing?
Thanks!
Carlo
app.py
import faust
VERSION = 1
PROJECT = "users"
ORIGIN = "app"
BROKER = 'kafka-1:29092'
MODULES = [
f"{ORIGIN}.user",
]
app = faust.App(
PROJECT,
version=1,
autodiscover=False,
origin=ORIGIN,
broker=BROKER,
topic_replication_factor=1,
topic_partitions=1,
store='memory://',
)
app.discover(*MODULES)
def main() -> None:
app.main()
user.py
import logging
import faust
from app.app import app
from schema_registry.client import SchemaRegistryClient, schema
from schema_registry.serializers.faust import FaustSerializer
logger = logging.getLogger(__name__)
# create an instance of the `SchemaRegistryClient`
client = SchemaRegistryClient(url="http://schema-registry:8081")
# register codec at faust
avro_user_schema = schema.AvroSchema({
"type": "record",
"namespace": "com.example",
"name": "AvroUsers",
"fields": [
{"name": "first_name", "type": "string"},
{"name": "last_name", "type": "string"}
]
})
avro_user_serializer = FaustSerializer(client, "users", avro_user_schema)
faust.serializers.codecs.register(name='avro_users', codec=avro_user_serializer)
# model
class UserModel(faust.Record, serializer='avro_users'):
first_name: str
last_name: str
# topic
users_topic = app.topic('avro_users', partitions=1, value_type=UserModel)
@app.agent(users_topic)
async def users(users):
async for user in users:
logger.info("Event received in topic avro_users")
logger.info(f"First Name: {user.first_name}, last name {user.last_name}")
@app.timer(1.0, on_leader=True)
async def publish_users():
logger.info('PUBLISHING ON LEADER FOR USERS APP!')
user = {"first_name": "foo", "last_name": "bar"}
await users.send(value=user, value_serializer=avro_user_serializer)
schema-registry
schema-registry:
image: confluentinc/cp-schema-registry:7.1.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- kafka-1
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka-1:29092
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
SCHEMA_REGISTRY_DEBUG: true
networks:
- stream
Upvotes: 0
Views: 726