Nilam Dhatrak
Nilam Dhatrak

Reputation: 61

How to share faust table between multiple agents or faust timers?

I'm trying to publish faust table's data(count) to a kafka topic after some time interval. timer is working when I publish some simple string, but it is not able to access table's data somehow. Following is the code for timer:

@app.timer(interval=10.0)
async def publish_to_anomaly_topic():
            await anomaly_topic.send(
            value=str(page_views['total'].value())
          )
@app.agent(page_view_topic)
async def count_page_views(views):
    async for view in views.group_by(PageView.id):
        total=0
        page_views[view.id]+=1
        for everykey in list(page_views.keys()):
            if everykey != 'total':
                total+=page_views[everykey].value()
        page_views['total'] = total

The agent is working fine. I am able to see the values correctly.

Upvotes: 5

Views: 2133

Answers (2)

Ethan Kulla
Ethan Kulla

Reputation: 333

Found this question in my attempts to do the same thing and here is how I was able to figure it out.

https://faust.readthedocs.io/en/latest/userguide/tables.html

You cannot modify a table outside of a stream operation; this means that you can only mutate the table from within an async for event in stream: block. We require this to align the table’s partitions with the stream’s, and to ensure the source topic partitions are correctly rebalanced to a different worker upon failure, along with any necessary table partitions.

Modifying a table outside of a stream will raise an error:

The documentation says that you are not able to access/modify the table outside of a stream operation.

To get around this you can split your timer function into 2 parts:

@app.timer(10)
async def my_timer_function():
    # value does not matter as much as the send operation
    await my_calling_function.send(value="send data now!") 

@app.agent()
async def my_calling_function(stream_from_timer_func):
    async for message in stream_from_timer_func:
        print(message) # this will print "send data now!"
        table_data = my_table['key']
        # Here is where you can access your table data and finish sending the 
        # message to the topic you want
        await my_topic.send(value=table_data)

As you can see if you use the timer function to send a message to an agent, you are able to access the table you want, it just has to be in an

async for event in stream:

block of code.

Upvotes: 2

Nilam Dhatrak
Nilam Dhatrak

Reputation: 61

After lot of experiments, it turned out that you can't access table's values along with the app timer(even if you specify relative_field option while creating table). A workaround for this problem is to create another table to maintain timestamps of your messages and use them in your business logic.

   if view.timestamp-page_views_timer[view.id+'_first_timestamp'] > 60:
         await anomaly_topic.send(value={//the data to be sent})

where page_views_timer is the new table created.

Upvotes: 0

Related Questions