Reputation: 3
I am new to faust and want to keep track of how many messages I received in the last n seconds and update this each l seconds. I thought the hopping table would be suitable for this purpose.
For testing, I am producing a message each second to my topic and expected the following code (n = 5 and l = 1) should "count" up from one to five and then stay at five if the messages keep coming in:
hopping_table = app.Table("hopping_table", default = int)\
.hopping(5, 1)
@app.agent(topic)
async def process(stream):
async for value in stream:
hopping_table["sum"] += 1
print(f"sum: {hopping_table['sum'].value()}")
But as a result, I get:
[2024-02-04 22:50:55,092] [20956] [WARNING] sum: 1
[2024-02-04 22:50:56,094] [20956] [WARNING] sum: 1
[2024-02-04 22:50:57,096] [20956] [WARNING] sum: 1
[2024-02-04 22:50:58,098] [20956] [WARNING] sum: 1
[2024-02-04 22:50:59,099] [20956] [WARNING] sum: 1
[2024-02-04 22:51:00,101] [20956] [WARNING] sum: 1
Can I achieve want I want to do by using hopping windows or is my understanding in general wrong? Unfortunately, I didn't find too much specific information about faust's sliding windows.
Thanks in advance
Upvotes: 0
Views: 142
Reputation: 339
Your understanding of hopping windows is correct and it can be achieved using a counter. How did you get on with your solution?
Faust is a community-driven project and whilst it’s stable, as you've found, its implementation and documentation of window operations can be challenging for newcomers.
FYI there are other open source Python alternatives to Faust that are focused on Python developer experience. I work on Quix Streams and like Faust it's a pure Python alternative, doesn't require a server-side cluster and has good adoption. There’s a growing team of maintainers and, most importantly, has active community members in Slack so responses to support requests are usually very quick. We don't just discuss our own library, so I invite everyone interested in working with Python and Kafka to join regardless of skill level.
For future reference on windowing, the open source project's lead engineer wrote a blog post explaining windowing in stream processing. I hope this will be useful.
Upvotes: 2
Reputation: 529
I too have encountered this hopping windows behaviour. I found the only actual answer in the article Stream Processing with Python, Kafka & Faust. And I completely agree with the author of the article that windowed tables which are inadequately explained in the Faust documentation and often lead to confusion
.
Author used function to update hopping window tables:
async def update_table(events, key, window_wrapper, inner_table):
t = window_wrapper.get_timestamp()
for window_range in inner_table._window_ranges(t):
prev = inner_table[(key, window_range)]
prev.extend(events)
inner_table[(key, window_range)] = prev
Upvotes: -1