Rohit
Rohit

Reputation: 1101

Running Faust Agent Synchronously

Check the below code

@app.agent()
async def process(stream):
    async for value in stream.take(5000, within=5):
        process(value)

The agent takes 5000 records within 5 seconds asynchronously and process them. I don't want the agent to pick another 5000 thousand records until the processing of previous one is finished. Basically I want to run the agent Synchronously. Is there a way we can do it?

Upvotes: 1

Views: 1335

Answers (2)

Rohit
Rohit

Reputation: 1101

I tried with the following code to see whether the worker is executing second batch of record while the processing of first batch has not yet finished

@app.agent()
async def process(stream):
    async for value in stream.take(5000, within=5):
        print(1)
        await async.sleep(30)

The worker printed 1 and waited for 30 seconds to print 2. The await statement gives control back to the event loop but in this case it waited which implies that the batches are executed one after the another. Hence this is synchronous.

PS. Committing offset, rebalancing, monitoring etc are asynchronous operations which are handled by event loop.

Upvotes: 0

BWStearns
BWStearns

Reputation: 2706

I think you could set the concurrency to 1 on the agent and that'd effectively render it synchronous.

You might also find modifying the topic partitions to be useful if you do that but I don't have a complete understanding of the relationship between these two settings (just wanted to point out a potentially useful avenue).

Upvotes: 1

Related Questions