Reputation: 5730
I'm currently using schema registry and faust
to process stream data.
The reason I try to avoid using faust.Record
is the schema can be dynamically changed and I don't like to change the code(class inheriting faust.Record
) every time it happend.
But without faust.Record
, it looks like there are many restrictions.
For example, app.Table
's relative_to_field
requires FieldDescriptorT
but this class looks stronly coupled with faust.Record
Here is the code:
import faust
from datetime import timedelta
from pydantic_avro.base import AvroBase
from schema_registry.client import SchemaRegistryClient, schema
from schema_registry.serializers.faust import FaustSerializer
topic_name = "practice4"
subject_name = f"{topic_name}-value"
serializer_name = f"{topic_name}_serializer"
bootstrap_server = "192.168.59.100:30887"
sr_server = "http://localhost:8081"
client = SchemaRegistryClient({"url": sr_server})
topic_schema = client.get_schema(subject_name)
fp_avro_schema = schema.AvroSchema(topic_schema.schema.raw_schema)
avro_fp_serializer = FaustSerializer(client, serializer_name, fp_avro_schema)
faust.serializers.codecs.register(name=serializer_name, codec=avro_fp_serializer)
app = faust.App('sample_app', broker=bootstrap_server)
faust_topic = app.topic(topic_name, value_serializer=serializer_name)
count_table = app.Table(
'count_table', default=int,
).hopping(
timedelta(minutes=10),
timedelta(minutes=5),
expires=timedelta(minutes=10)
).relative_to_field(??????)
@app.agent(faust_topic)
async def process_fp(fps):
async for fp in fps.group_by(lambda fp: fp["job_id"], name=f"{subject_name}.job_id"):
print(fp)
Luckily, stream's group_by
can be called with callable
object, so I can handle it with lambda
but table's relative_to_field
has no option such like that.
Upvotes: 0
Views: 171
Reputation: 1219
Short answer: No, you're right, you cannot define relative_to_field
without a FieldDescriptor
. You can check the definiton of relative_to_field
here. Then, this field is extracted here with a getattr
, you need a faust.Record
for this operation.
However, as you use Avro
you may use the library dataclasses-avroschema to combine faust.Record
and Avro
in the same class. Thus, you will be able to use best of both world.
This library integrates well with faust as you can see in doc.
Upvotes: 0