user3595632
user3595632

Reputation: 5730

Is there any way to define app.Table without using Record in faust?

I'm currently using schema registry and faust to process stream data.

The reason I try to avoid using faust.Record is the schema can be dynamically changed and I don't like to change the code(class inheriting faust.Record) every time it happend.

But without faust.Record, it looks like there are many restrictions. For example, app.Table's relative_to_field requires FieldDescriptorT but this class looks stronly coupled with faust.Record

Here is the code:

import faust

from datetime import timedelta
from pydantic_avro.base import AvroBase

from schema_registry.client import SchemaRegistryClient, schema
from schema_registry.serializers.faust import FaustSerializer


topic_name = "practice4"
subject_name = f"{topic_name}-value"
serializer_name = f"{topic_name}_serializer"
bootstrap_server = "192.168.59.100:30887"

sr_server = "http://localhost:8081"

client = SchemaRegistryClient({"url": sr_server})
topic_schema = client.get_schema(subject_name)

fp_avro_schema = schema.AvroSchema(topic_schema.schema.raw_schema)

avro_fp_serializer = FaustSerializer(client, serializer_name, fp_avro_schema)
faust.serializers.codecs.register(name=serializer_name, codec=avro_fp_serializer)

app = faust.App('sample_app', broker=bootstrap_server)
faust_topic = app.topic(topic_name, value_serializer=serializer_name)


count_table = app.Table(
    'count_table', default=int,
).hopping(
    timedelta(minutes=10),
    timedelta(minutes=5),
    expires=timedelta(minutes=10)
).relative_to_field(??????)


@app.agent(faust_topic)
async def process_fp(fps):
    async for fp in fps.group_by(lambda fp: fp["job_id"], name=f"{subject_name}.job_id"):     
        print(fp)

Luckily, stream's group_by can be called with callable object, so I can handle it with lambda but table's relative_to_field has no option such like that.

Upvotes: 0

Views: 171

Answers (1)

Thomas
Thomas

Reputation: 1219

Short answer: No, you're right, you cannot define relative_to_field without a FieldDescriptor. You can check the definiton of relative_to_field here. Then, this field is extracted here with a getattr, you need a faust.Record for this operation.

However, as you use Avro you may use the library dataclasses-avroschema to combine faust.Record and Avro in the same class. Thus, you will be able to use best of both world.

This library integrates well with faust as you can see in doc.

Upvotes: 0

Related Questions