Ordani Sanchez
Ordani Sanchez

Reputation: 401

How to run a coroutine and wait it result from a sync func when the loop is running?

I have a code like the foolowing:

def render():
    loop = asyncio.get_event_loop()

    async def test():
        await asyncio.sleep(2)
        print("hi")
        return 200

    if loop.is_running():
        result = asyncio.ensure_future(test())
    else:
        result = loop.run_until_complete(test())

When the loop is not running, it is quite easy: just use loop.run_until_complete and it returns the coro result but if the loop is already running (my blocking code running in the app which is already running the loop). I cannot use loop.run_until_complete since it will raise an exception; when I call asyncio.ensure_future the task gets scheduled and run, but I want to wait there for the result, does anybody know how to do this? Docs are not very clear on how to do this.

I tried passing a concurrent.futures.Future calling set_result inside the coro and then calling Future.result() on my blocking code, but it doesn't work: it blocks there and does not let anything else run. Any help would be appreciated.

Upvotes: 21

Views: 18379

Answers (3)

Ronaldinho
Ronaldinho

Reputation: 83

here is my case, my whole programe is async, but call some sync lib, then callback to my async func.

follow the answer by user4815162342.

import asyncio

async def asyncTask(k):
    ''' asynchronous task '''
    print("--start async task %s" % k)
    # await asyncio.sleep(3, loop=loop)
    await asyncio.sleep(3)
    print("--end async task %s." % k)
    key = "KEY#%s" % k
    return key


def my_callback():
    print("here i want to call my async func!")
    future = asyncio.run_coroutine_threadsafe(asyncTask(1), LOOP)
    return future.result()

def sync_third_lib(cb):
    print("here will call back to your code...")
    cb()

async def main():
    print("main start...")

    print("call sync third lib ...")
    await asyncio.to_thread(sync_third_lib, my_callback)
    # await loop.run_in_executor(None, func=sync_third_lib)
    print("another work...keep async...")
    await asyncio.sleep(2)

    print("done!")


LOOP = asyncio.get_event_loop()
LOOP.run_until_complete(main())

Upvotes: 2

AlQuemist
AlQuemist

Reputation: 1304

Waiting Synchronously for an Asynchronous Coroutine

If an asyncio event loop is already running by calling loop.run_forever, it will block the executing thread until loop.stop is called [see the docs]. Therefore, the only way for a synchronous wait is to run the event loop on a dedicated thread, schedule the asynchronous function on the loop and wait for it synchronously from another thread.

For this I have composed my own minimal solution following the answer by user4815162342. I have also added the parts for cleaning up the loop when all work is finished [see loop.close].

The main function in the code below runs the event loop on a dedicated thread, schedules several tasks on the event loop, plus the task the result of which is to be awaited synchronously. The synchronous wait will block until the desired result is ready. Finally, the loop is closed and cleaned up gracefully along with its thread.

The dedicated thread and the functions stop_loop, run_forever_safe, and await_sync can be encapsulated in a module or a class.

For thread-safery considerations, see section “Concurrency and Multithreading” in asyncio docs.

import asyncio
import threading
#----------------------------------------

def stop_loop(loop):
    ''' stops an event loop '''
    loop.stop()
    print (".: LOOP STOPPED:", loop.is_running())

def run_forever_safe(loop):
    ''' run a loop for ever and clean up after being stopped '''

    loop.run_forever()
    # NOTE: loop.run_forever returns after calling loop.stop

    #-- cancell all tasks and close the loop gracefully
    print(".: CLOSING LOOP...")
    # source: <https://xinhuang.github.io/posts/2017-07-31-common-mistakes-using-python3-asyncio.html>

    loop_tasks_all = asyncio.Task.all_tasks(loop=loop)

    for task in loop_tasks_all: task.cancel()
    # NOTE: `cancel` does not guarantee that the Task will be cancelled

    for task in loop_tasks_all:
        if not (task.done() or task.cancelled()):
            try:
                # wait for task cancellations
                loop.run_until_complete(task)
            except asyncio.CancelledError: pass
    #END for
    print(".: ALL TASKS CANCELLED.")

    loop.close()
    print(".: LOOP CLOSED:", loop.is_closed())

def await_sync(task):
    ''' synchronously waits for a task '''
    while not task.done(): pass
    print(".: AWAITED TASK DONE")
    return task.result()
#----------------------------------------

async def asyncTask(loop, k):
    ''' asynchronous task '''
    print("--start async task %s" % k)
    await asyncio.sleep(3, loop=loop)
    print("--end async task %s." % k)
    key = "KEY#%s" % k
    return key

def main():
    loop = asyncio.new_event_loop() # construct a new event loop

    #-- closures for running and stopping the event-loop
    run_loop_forever = lambda: run_forever_safe(loop)
    close_loop_safe = lambda: loop.call_soon_threadsafe(stop_loop, loop)

    #-- make dedicated thread for running the event loop
    thread = threading.Thread(target=run_loop_forever)

    #-- add some tasks along with my particular task
    myTask = asyncio.run_coroutine_threadsafe(asyncTask(loop, 100200300), loop=loop)
    otherTasks = [asyncio.run_coroutine_threadsafe(asyncTask(loop, i), loop=loop)
                  for i in range(1, 10)]

    #-- begin the thread to run the event-loop
    print(".: EVENT-LOOP THREAD START")
    thread.start()

    #-- _synchronously_ wait for the result of my task
    result = await_sync(myTask) # blocks until task is done
    print("* final result of my task:", result) 

    #... do lots of work ...
    print("*** ALL WORK DONE ***")
    #========================================

    # close the loop gracefully when everything is finished
    close_loop_safe()
    thread.join()
#----------------------------------------

main()

Upvotes: 9

user4815162342
user4815162342

Reputation: 155216

To implement runner with the proposed design, you would need a way to single-step the event loop from a callback running inside it. Asyncio explicitly forbids recursive event loops, so this approach is a dead end.

Given that constraint, you have two options:

  1. make render() itself a coroutine;
  2. execute render() (and its callers) in a thread different than the thread that runs the asyncio event loop.

Assuming #1 is out of the question, you can implement the #2 variant of render() like this:

def render():
    loop = _event_loop  # can't call get_event_loop()

    async def test():
        await asyncio.sleep(2)
        print("hi")
        return 200

    future = asyncio.run_coroutine_threadsafe(test(), loop)
    result = future.result()

Note that you cannot use asyncio.get_event_loop() in render because the event loop is not (and should not be) set for that thread. Instead, the code that spawns the runner thread must call asyncio.get_event_loop() and send it to the thread, or just leave it in a global variable or a shared structure.

Upvotes: 12

Related Questions