Kumar Govindam
Kumar Govindam

Reputation: 115

Combine asyncio.ensure_future and loop.run_until_complete in a single api call?

I have written a async function which collects multiple text data and does data processsing in a batch. After that, it returns the output, like this:

import sys
import asyncio

Model_runner():
    '''
    The model runner combines all the input coming to it and combines in a batch of 10 or 1 sec, which ever duration is less. 
    After combining, it does processing and returns the output 
    '''


loop = asyncio.get_event_loop()
model_obj = ModelRunner(loop)
loop.create_task(model_obj.model_runner())


async def process_text(text):
        out_ = await model_obj.process_input(text)
        return out_

To get the output, I am running the following code:

task1 = asyncio.ensure_future(process_text(text1))
task2 = asyncio.ensure_future(process_text(text2))
task3 = asyncio.ensure_future(process_text(text3))
task4 = asyncio.ensure_future(process_text(text4))
async_tasks = [task1, task2, task3, task4]
out1, out2 ,out3 ,out4 = loop.run_until_complete(asyncio.gather(*async_tasks))

Here, out1, out2, out3, and out4 are the output after processing the text data.

Here, I do not want to combine the task like [task1, task2, task3, task4] and then call the loop.run_until_complete to get the output. Instead, I am looking for a function like this:

out1 = func(text1)
out2 = func(text2) 
etc..

But, they should work in in non blocking way like asyncio.ensure_future. How can I do that. Thanks in advance.

Upvotes: 0

Views: 270

Answers (1)

Arthur Tacca
Arthur Tacca

Reputation: 9978

Two obvious options:

  • If you already have multiple threads, why bother with asyncio at all? Just make process_text a regular blocking function and call it from those threads.
  • Conversely, if you're using asyncio, why use multiple threads at all? Make your top-level tasks async and run them all in one thread.

If you really must use multiple threads and async functions:

  • Have a single thread running your asyncio loop and the worker threads you already mentioned, and use loop.call_soon_threadsafe in the threads to force the asyncs function to run in the async thread. If you want to get the result back to the thread you can use a queue.Queue to send the result (or results) back.
  • This final option is the worst one possible and almost certainly not what you want, but I mention it for completeness: start a separate asyncio event loop from each thread that needs it and use those to run your async functions in the worker threads directly.

Upvotes: 1

Related Questions