Torilla
Torilla

Reputation: 403

Python3 asyncio event loops and task cancelling

I have been trying to understand the asyncio module in order to implement a server. I was looking at this question, which seems to ask a similar, if not the same question, however I am still trying to grasp the workflow happening behind the sceens.

I have the following simple program that has two coroutines, one reading from the terminal and putting it into a Queue and one coroutine that waits for items in the queue and simply prints them back to the screen.

import asyncio

q = asyncio.Queue()

async def put():
    while True:
        await q.put(input())      #Input would be normaly something like client.recv()
        await asyncio.sleep(1)    #This is neccessarry but I dont understand why

async def get():
    while True:
        print(await q.get())

def run():
    loop = asyncio.get_event_loop()
    task1 = loop.create_task(put())
    task2 = loop.create_task(get())
    loop.run_forever()

run()

This program works as expected, however when one removes the await asyncio.sleep(1) statement from the put method, it stops working. I assume because the loop keeps eating up the thread and the message doesn't get pushed through. I don't understand why though because I would think input() would be a blocking function and the coroutine should thus suspend until a new line is available on the tty.


The second problem is, if I use asyncio.get_event_loop() in the run() call, the interpreter warns me that there is no active loop, however, as stated in the documentation this call is deprecated and thus I tried replacing it with asyncio.new_event_loop(). The programm still works the same, however I get a traceback upon KeyboardInterrupt (which does not happen when calling asyncio.get_event_loop())

Task was destroyed but it is pending!                                                         
task: <Task pending name='Task-1' coro=<put() running at test.py:10> wait_for=<Future pending cb=[Task.task_wakeup()]>>             
Task was destroyed but it is pending!                                                       
task: <Task pending name='Task-2' coro=<get() running at test.py:15> wait_for=<Future pending cb=[Task.task_wakeup()]>>               
Exception ignored in: <coroutine object get at 0x7f32f51369d0>                              
Traceback (most recent call last):                                                          
  File "test.py", line 15, in get     
  File "/usr/lib64/python3.10/asyncio/queues.py", line 161, in get                     
  File "/usr/lib64/python3.10/asyncio/base_events.py", line 745, in call_soon          
  File "/usr/lib64/python3.10/asyncio/base_events.py", line 510, in _check_closed     
RuntimeError: Event loop is closed                                 

A third variant I tried was to make the run method itself async and call it via the asyncio.run(run()) call.

import asyncio

q = asyncio.Queue()

async def put():
    while True:
        await q.put(input())
        await asyncio.sleep(1)


async def get():
    while True:
        print(await q.get())


async def run():
    loop = asyncio.get_event_loop()
    task1 = loop.create_task(put())
    task2 = loop.create_task(get())
    await task1

asyncio.run(run())

This works just fine as well, however if I replace await task1 with await task2, again I get errors when I interrupt the program. Why is the execution different between these and how is it supposed to be done in the end?

Upvotes: 2

Views: 1393

Answers (1)

Bharel
Bharel

Reputation: 26941

As stated in the comments, with StreamReader (your original code) problem #1 will work flawlessly and cause no issue. input() does not give a chance for aio to switch coroutines, and you can try and limit the queue to a certain length if StreamReader constantly has data.


For problem #2, during cleanup, Python uses the assigned loop for the current thread. Under the hood, asyncio.run() and asyncio.get_event_loop() assigns the loop to the main thread. When it doesn't find a loop, all hell breaks loose.

If you wish to assign it yourself, you can do so like that:

def run():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    task1 = loop.create_task(put())
    task2 = loop.create_task(get())
    loop.run_forever()

Keep in mind you're still missing some manual cleanup (i.e. shutdown_asyncgens and shutdown_executor) but that's for a different topic. Overall, using asyncio.run() is usually the correct choice.


I'm unable to reproduce problem #3, both await task1 and await task2 work flawlessly.

Upvotes: 1

Related Questions