Reputation: 91
1>Python Asyncio is not running new coroutine using asyncio.run_coroutine_threadsafe. Below is the code testing performed on Mac.
————————————————————————————————
import os
import random
import asyncio
import logging
from concurrent.futures import ThreadPoolExecutor
os.environ['PYTHONASYNCIODEBUG'] = '1'
logging.basicConfig(level=logging.WARNING)
async def coro():
print("Coroutine {} is has started")
async def main(loop):
print(" 1 ")
fut = asyncio.run_coroutine_threadsafe(coro(), loop)
print(f"Future --")
print(" 2 ")
print(" Result ",fut.result())
print(" 3 ")
if __name__== '__main__':
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(main(loop))
————————————————————————————————
Output:
1
Future -- <Future at 0x102a05358 state=pending>
2
=============================================================================
2>The output is same when I run on a different executor also as shown below
—————————————————————————————————————
new_loop = asyncio.new_event_loop()
new_loop.set_default_executor(ThreadPoolExecutor(max_workers=2))
fut = asyncio.run_coroutine_threadsafe(coro(), new_loop)
—————————————————————————————————————
Sample code:
import os
import random
import asyncio
import logging
from concurrent.futures import ThreadPoolExecutor
os.environ['PYTHONASYNCIODEBUG'] = '1'
logging.basicConfig(level=logging.WARNING)
async def coro():
print("Coroutine {} is has started")
async def main(loop):
print(" 1 ")
new_loop = asyncio.new_event_loop()
new_loop.set_default_executor(ThreadPoolExecutor(max_workers=2))
fut = asyncio.run_coroutine_threadsafe(coro(), new_loop)
print(f"Future --")
print(" 2 ")
print(" Result ",fut.result())
print(" 3 ")
if __name__== '__main__':
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(main(loop))
————————————————————————————————
Output:
1
Future -- <Future at 0x102f5d668 state=pending>
2
Upvotes: 1
Views: 1963
Reputation: 154886
The first snippet submits a coroutine to the event loop it is already running in. You don't need run_coroutine_threadsafe
for that, you can just call asyncio.create_task
. As mentioned in the other answer, the idea with run_coroutine_threadsafe
is that you run it from your sync code in order to submit a task to an event loop running in another thread.
The other problem is that you never give a chance to the task to execute, as you don't await anything in main()
and use run_until_complete(main())
at top-level. As a result, the code only runs the event loop until main()
completes, not caring for the tasks it might have spawned in the background. To fix it, you should either await the spawned task or return it so it can be awaited from top-level.
The second snippet submits a coroutine to a freshly created event loop that no one is running, so again the coroutine doesn't get a chance to execute. To fix it, you should execute new_loop.run_forever
in an separate thread.
Finally, the executor as set by set_default_executor
doesn't serve to execute coroutines. In asyncio (and other Python async frameworks) coroutines all run in a single thread, the one that runs the event loop. The execute concurrently through the virtue of await
transferring control back to the event loop. The executor you've set serves to execute blocking code passed to run_in_executor
.
Upvotes: 1
Reputation: 6324
From asyncio.run_coroutine_threadsafe
:
This function is meant to be called from a different OS thread than the one where the event loop is running
async def coro():
print("Coroutine {} is has started")
def start_coro(loop):
fut = asyncio.run_coroutine_threadsafe(coro(), loop)
print(fut.result())
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.set_debug(True)
Thread(target=start_coro, args=(loop,)).start()
loop.run_forever()
Upvotes: 2