Minix
Minix

Reputation: 267

How to run a coroutine only once at a time, no matter how many times it's called?

Let's say I have an electronic billboard, that is supposed to display up to date information, that could change more than once per second. The billboard's update routine takes about 10 seconds, so calling and blocking it everytime is not ideal.

So let's make it an asyncio coroutine and call it with asyncio.ensure_future. That would make it possible to have multiple update coroutines running at the same time, though, maybe even introducing race conditions, where the last update is overwritten by a delayed former update.

Is there a neat, pythonic way to handle this?

I thought about having an internal future, which, when update is called, can either be started or waited on to have finished, before calling it again. But wouldn't that block the updating again?

# create initial future and set to done
_long_update_future = asyncio.get_event_loop.create_future()
_long_update_future.set_result(0)

# takes a long time, so run asynchronously
async def long_update_function(info):
  ...

# only run the long update function one at a time
async def update(new_info):
  if not _long_update_future.done():
    # if future is already running, then wait for it first
    await _long_update_future

  _long_update_future = asyncio.ensure_future(long_update_function(new_info)


async def main():
  while True:
    new_info = block_until_new_info()

    asyncio.ensure_future(update(new_info))

Upvotes: 1

Views: 1131

Answers (1)

Paul Cornelius
Paul Cornelius

Reputation: 10916

You could use a class with a couple of boolean flags to regulate the updates. Something like this:

#! python3.8

import asyncio
import random

class Updater:
    def __init__(self):
        self.in_process = False
        self.pending = False
        
    async def request(self):
        if self.in_process:
            self.pending = True
            return
        self.pending = False
        self.in_process = True
        await asyncio.sleep(2.0)  # Ten seconds is too boring
        self.in_process = False
        print("...updated")
        if self.pending:
            asyncio.create_task(self.request())
        
async def main():
    up = Updater()
    while True:
        seconds = random.random() * 4.0
        await asyncio.sleep(seconds)
        print("request")
        asyncio.create_task(up.request())
        
asyncio.run(main())
        

When the first request comes in, the updater runs. If another request comes in while the first one is in process, it sets the "pending" flag and returns.

At the end of the update function, it checks to see if any new requests are pending; if so, it schedules itself again. If not, it's all finished. In that case, it doesn't run again until the main() function issues a new request.

The "pending" flag is, in effect, a queue of requests with a maximum queue depth of 1. Multiple requests are serialized so no more than one is running at a time. It doesn't care if requests arrive faster than it can process; it processes what it can and discards the rest.

Upvotes: 2

Related Questions