partha1984
partha1984

Reputation: 91

Python Asyncio is not running new coroutine using asyncio.run_coroutine_threadsafe

1>Python Asyncio is not running new coroutine using asyncio.run_coroutine_threadsafe. Below is the code testing performed on Mac.
————————————————————————————————

import os    
import random      
import asyncio      
import logging     
from concurrent.futures import ThreadPoolExecutor      
     
os.environ['PYTHONASYNCIODEBUG'] = '1'      
logging.basicConfig(level=logging.WARNING)      
    
async def coro():      
    print("Coroutine {} is has started")      
    
      
async def main(loop):     
    print(" 1 ")    
    fut = asyncio.run_coroutine_threadsafe(coro(), loop)      
    print(f"Future  --")          
    print(" 2 ")      
    print(" Result ",fut.result())      
    print(" 3 ")      
    
if __name__== '__main__':      
    loop = asyncio.get_event_loop()      
    loop.set_debug(True)      
    loop.run_until_complete(main(loop))      

————————————————————————————————
Output:

 1       
Future -- <Future at 0x102a05358 state=pending>      
 2 

=============================================================================

2>The output is same when I run on a different executor also as shown below
—————————————————————————————————————

new_loop = asyncio.new_event_loop()      
new_loop.set_default_executor(ThreadPoolExecutor(max_workers=2))      
fut = asyncio.run_coroutine_threadsafe(coro(), new_loop) 

—————————————————————————————————————
Sample code:

import os      
import random      
import asyncio      
import logging      
from concurrent.futures import ThreadPoolExecutor      
      
os.environ['PYTHONASYNCIODEBUG'] = '1'      
logging.basicConfig(level=logging.WARNING)      
      
async def coro():      
    print("Coroutine {} is has started")      
      
      
async def main(loop):    
    print(" 1 ")    
    new_loop = asyncio.new_event_loop()    
    new_loop.set_default_executor(ThreadPoolExecutor(max_workers=2))    
    fut = asyncio.run_coroutine_threadsafe(coro(), new_loop)      
    print(f"Future  --")        
    print(" 2 ")      
    print(" Result ",fut.result())      
    print(" 3 ")      
    
if __name__== '__main__':      
    loop = asyncio.get_event_loop()    
    loop.set_debug(True)      
    loop.run_until_complete(main(loop))    

————————————————————————————————
Output:

 1
Future  -- <Future at 0x102f5d668 state=pending>    
 2     

Upvotes: 1

Views: 1963

Answers (2)

user4815162342
user4815162342

Reputation: 154886

The first snippet submits a coroutine to the event loop it is already running in. You don't need run_coroutine_threadsafe for that, you can just call asyncio.create_task. As mentioned in the other answer, the idea with run_coroutine_threadsafe is that you run it from your sync code in order to submit a task to an event loop running in another thread.

The other problem is that you never give a chance to the task to execute, as you don't await anything in main() and use run_until_complete(main()) at top-level. As a result, the code only runs the event loop until main() completes, not caring for the tasks it might have spawned in the background. To fix it, you should either await the spawned task or return it so it can be awaited from top-level.

The second snippet submits a coroutine to a freshly created event loop that no one is running, so again the coroutine doesn't get a chance to execute. To fix it, you should execute new_loop.run_forever in an separate thread.

Finally, the executor as set by set_default_executor doesn't serve to execute coroutines. In asyncio (and other Python async frameworks) coroutines all run in a single thread, the one that runs the event loop. The execute concurrently through the virtue of await transferring control back to the event loop. The executor you've set serves to execute blocking code passed to run_in_executor.

Upvotes: 1

RafalS
RafalS

Reputation: 6324

From asyncio.run_coroutine_threadsafe:

This function is meant to be called from a different OS thread than the one where the event loop is running

async def coro():
    print("Coroutine {} is has started")

def start_coro(loop):
    fut = asyncio.run_coroutine_threadsafe(coro(), loop)
    print(fut.result())

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.set_debug(True)
    Thread(target=start_coro, args=(loop,)).start()
    loop.run_forever()

Upvotes: 2

Related Questions