THK
THK

Reputation: 701

Correct way to call run_in_executor() for blocking code in asyncio loop

In the following code (CPython 3.9), I have an async client that's listening for messages. After receiving each message, if certain conditions are satisfied, it calls a blocking, CPU-intensive function gen_block.

Question 1: in the below example, gen_block blocks the loop (i.e. preventing further messages from being read by read_msg), despite being called from run_in_executor()... what am I missing here?

Question 2: once gen_block is called, I don't want it to be called again until the original call is completed. What's the correct way to handle that state in the asyncio framework?

async def main(args):
    reader, writer = await asyncio.open_connection(args.host, args.port
    try:
        while data := await read_msg(reader):
            handle_data(data, writer)
            if len(TXN_QUEUE) >= 3:
                await loop.run_in_executor(None, gen_block, writer)
    
    # except, finally, shutdown etc

Pointers to references / SO posts would be fine too; I couldn't find a relevant answer for this specific scenario of calling run_in_executor from within a loop.

Upvotes: 0

Views: 2144

Answers (1)

Matt Fowler
Matt Fowler

Reputation: 2743

When you say await loop.run_in_executor(None, gen_block, writer) the await means wait until loop_in_executor finishes its CPU heavy task and therefore is not any different than calling gen_block directly in the loop, it will block until it finishes. You can remove the await keyword and keep track of tasks in a list like so which should stop things from blocking, since this is already in a separate task:

async def main(args):
    tasks = []
    reader, writer = await asyncio.open_connection(args.host, args.port
    try:
        while data := await read_msg(reader):
            handle_data(data, writer)
            if len(TXN_QUEUE) >= 3:
                tasks.append(loop.run_in_executor(None, gen_block, writer))

# do something with the tasks list later, like asyncio.gather

However, given your requirement around waiting for gen_block to finish before it is called a second time, a queue could be a good approach. When you're ready to do CPU intensive work, you put the needed information to do it into a queue. Then, you have one worker task run your CPU intensive work which reads work to do from the queue and runs it in a separate thread. Since there is only one worker you'll only run one call to your CPU intensive work at a time, while other requests queue up behind it.

Here is a simple example of this to illustrate the idea. We create a server that on localhost that accepts simple text messages and then puts them into a queue. Then, we have one worker pull from the queue and run CPU intensive work in a separate thread. Note I use asyncio.to_thread here which was introduced in 3.9 and abstracts away managing the executor for you.

import asyncio
import functools
from asyncio import Queue


def cpu_intensive(data: str):
    print(f"Running intense work for {data}")
    i = 0
    while i < 100000000:
        i = i + 1
    print(f"Finished intense work for {data}")


async def worker(queue: Queue):
    while True:
        work_item = await queue.get()
        await asyncio.to_thread(cpu_intensive, work_item)


async def handle_connect(queue: Queue, reader, writer):
    while data := await reader.readline():
        print(f"Queueing {data}")
        queue.put_nowait(data)


async def main():
    queue = Queue()
    asyncio.create_task(worker(queue))
    server = await asyncio.start_server(functools.partial(handle_connect, queue), '127.0.0.1', 8000)

    async with server:
        await server.serve_forever()


asyncio.run(main())

Running this and sending a, b, c as three messages to your server one after another, you'll see output similar to the following:

Queueing b'a\r\n'
Running intense work for b'a\r\n'
Queueing b'b\r\n'
Queueing b'c\r\n'
Finished intense work for b'a\r\n'
Running intense work for b'b\r\n'
Finished intense work for b'b\r\n'
Running intense work for b'c\r\n'

Upvotes: 1

Related Questions