CarloP
CarloP

Reputation: 99

Faust streaming app - Avro schema not published to Kafka cluster

I have set up a Kafka cluster with a schema-registry using docker containers.
Then I built a Faust streaming app that sends messages to a Kafka topic.

The messages are sent to the Kafka topic and stored as expected. But I cannot see the defined Avro schema in the Kafka schema-registry (using the confluent control-center).

Why is the avro schema not showing up in the control-center? What am I missing?

Thanks!
Carlo

enter image description here

enter image description here

app.py

import faust


VERSION = 1
PROJECT = "users"
ORIGIN = "app"
BROKER = 'kafka-1:29092'
MODULES = [
    f"{ORIGIN}.user",
]

app = faust.App(
    PROJECT,
    version=1,
    autodiscover=False,
    origin=ORIGIN,
    broker=BROKER,
    topic_replication_factor=1,
    topic_partitions=1,
    store='memory://',
)

app.discover(*MODULES)

def main() -> None:
    app.main()

user.py

import logging
import faust
from app.app import app
from schema_registry.client import SchemaRegistryClient, schema
from schema_registry.serializers.faust import FaustSerializer


logger = logging.getLogger(__name__)


# create an instance of the `SchemaRegistryClient`
client = SchemaRegistryClient(url="http://schema-registry:8081")

# register codec at faust
avro_user_schema = schema.AvroSchema({
    "type": "record",
    "namespace": "com.example",
    "name": "AvroUsers",
    "fields": [
        {"name": "first_name", "type": "string"},
        {"name": "last_name", "type": "string"}
    ]
})
avro_user_serializer = FaustSerializer(client, "users", avro_user_schema)
faust.serializers.codecs.register(name='avro_users', codec=avro_user_serializer)

# model
class UserModel(faust.Record, serializer='avro_users'):
    first_name: str
    last_name: str

# topic
users_topic = app.topic('avro_users', partitions=1, value_type=UserModel)

@app.agent(users_topic)
async def users(users):
    async for user in users:
        logger.info("Event received in topic avro_users")
        logger.info(f"First Name: {user.first_name}, last name {user.last_name}")

@app.timer(1.0, on_leader=True)
async def publish_users():
    logger.info('PUBLISHING ON LEADER FOR USERS APP!')
    user = {"first_name": "foo", "last_name": "bar"}
    await users.send(value=user, value_serializer=avro_user_serializer)

schema-registry

schema-registry:
  image: confluentinc/cp-schema-registry:7.1.0
  hostname: schema-registry
  container_name: schema-registry
  depends_on:
    - kafka-1
  ports:
    - "8081:8081"
  environment:
    SCHEMA_REGISTRY_HOST_NAME: schema-registry
    SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka-1:29092
    SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
    SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
    SCHEMA_REGISTRY_DEBUG: true
  networks:
    - stream

Upvotes: 0

Views: 726

Answers (0)

Related Questions