Reputation: 21
I am new to Python and coroutines, I am trying to leverage Python's asyncio library to parallel process a blocking function. I am using python 3.8.6. I have a blocking function that takes in a different input from an array inputs, I need the blocking functions for each input to run in parallel.
For example, if this is the time taken for each function to complete:
blocking_function(5) - takes 5 seconds
blocking_function(3) - takes 3 seconds
blocking_function(2) - takes 2 seconds
I have tried this but they still seem to run sequentially:
input = [5,3,2]
async def main():
tasks = [asyncio.create_task(blocking_function(input)) for input in inputs]
result = await asyncio.gather(*tasks)
print(result)
There are no errors running this but this executes the function sequentially and not in parallel. This implementation takes 8 secs. How do I get them to execute in parallel such that its done in 5 seconds?
Upvotes: 0
Views: 148
Reputation: 110591
Any blocking function has to be wrapped in something which makes sense as an awaitable in an asynchonous environment.
The straightforward way to do that is to use the loop.run_in_executor
call, which will call your blocking code in a separate thread (or even in a separate process) -
import asyncio
...
async def main():
loop = asyncio.get_running_loop()
tasks = [asyncio.create_task(loop.run_in_executor(None, blocking_function, input)) for input in inputs]
result = await asyncio.gather(*tasks)
print(result)
Check the docs at https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor , and optionally instantiate your own executor from concurrent.futures
, instead of using the default one by passing None
as the first argument.
In the comments bellow we see that for the task at hand the blocking function is not simply a "leave" and have to exchange data which will should be fetched in the asyncio-loop thread.
One way to exchange data is to use queues - because reading an asyncio.Queue inside and asyncio task can be no blocking (the threaded version have to resort to pool the queue periodically, though)
Since unlimited "max" queues are not blocking to put
, you can use an asyncio.Queue
instance to pass data from the blocking function into an asyncio task - which can then perform an I/O operation and put the response back in a queue.Queue
instance - which is pooled in the thread .
So, say:
import queue, asyncio
from functools import partial
...
def blocking_task(*args, incoming_queue, outgoing_queue):
# code performing task setup with args
for step in multi_steps:
# step setup code
...
# maybe repeat thse 2 lines more than once
# so that external data con be pre-fetched:
fetch_data_from = <external_data_source>
outgoing_queue.put_no_wait((incoming_queue, fetch_data_from))
# this will block the thread until data is available:
work_data = incoming_queue.get()
# CPU intensive, blocking code with work_data
...
async def data_fetcher(incoming_queue):
running_tasks = set()
while True:
outgoing_queue, fetch_data_from = incomng_queue.get()
task = asyncio.create_task(my_data_retriever_async_function(fetch_data_from))
# need a trick here to eager bind the current value of
# outgoing_queue to the callback for the task:
task.add_done_callback(
lambda task, queue=outgoing_queue:
queue.put_no_wait(task.result())
)
running_tasks.add(task)
# clear reference to completed data-fetching tasks:
running_tasks = await asyncio.wait(tasks, timeout=0)
# for illustrativ purposes - this function might not even
# be needed, if no error handlign or pre-or-post
# processing is needed at this step - just
# call the external lib directly from `data_fetcher`
async def my_data_retriever_async_function(source):
return await whatever_io_lib.fetch(source)
async def main():
incoming_queue = asyncio.Queue()
data_fetcher_task = asyncio.create_task(data_fetcher(incoming_queue))
loop = asyncio.get_running_loop()
tasks = set()
for input in inputs:
outgoing_queue = queue.Queue()
# we need partial to send named arguments
callable = partial(
blocking_function,
input,
outgoing_queue=incoming_queue,
incoming_queue = queue.Queue(), # create one queue for each external task to send results back
)
tasks.add(loop.run_in_executor(callable))
result = await asyncio.gather(*tasks)
data_fetcher_task.cancel()
print(result)
And this is actuallya peculiar arranement onlu needed if the "blocking_function" needs asynchronous obtainable I/O data in the middle of its processing.
A simpler design would be to move the for step in multi_steps:
loop into an async function, and break the pre and pos data fetching parts in separate synchronous functions to be called. each, with run_in_executor
.
I realizes this sounds complex - either you are getting these ideas up to here, which I hope so, or you will probably need some paid-for consulting work - sorry, these answers can't meaningfully cover all possible alternatives.
Upvotes: 1