Reputation: 11
I have a Kafka topic we will call ingest that receives an entry every x
seconds. I have a process that I want to run on this data but it has to be run on 100 events at a time. Thus, I want to batch the entries together and send them to a new topic called batched-ingest. The two topics will look like this...
ingest = [entry, entry, entry, ...]
batched-ingest = [[entry_0, entry_1, ..., entry_99]]
What is the correct way to do this using faust? The solution I have right now is this...
app = faust.App("explore", value_serializer="raw")
ingest = app.topic('ingest')
ingest_batch = app.topic('ingest-batch')
@app.agent(ingest, sink=[ingest_batch])
async def test(stream):
async for values in stream.take(10, within=1000):
yield values
I am not sure if this is the correct way to do this in Faust. If so, what should I set within
to in order to make it always wait until len(values) = 100
?
Upvotes: 1
Views: 1533
Reputation: 163
as mentioned in the faust take documentation if you omit the within
from take(100, within=10)
the code will block forever if there are 99 messages and the last hundredth message is never received. To solve this add a within timeout so that up to 100 values will be processed within 10 seconds. so that if there are periods of 10 seconds with no events received it will still process what it has gathered.
Upvotes: 2