Reputation: 403
I have been trying to understand the asyncio
module in order to
implement a server. I was looking at this question, which seems
to ask a similar, if not the same question, however I am still trying
to grasp the workflow happening behind the sceens.
I have the following simple program that has two coroutines, one reading from the terminal and putting it into a Queue and one coroutine that waits for items in the queue and simply prints them back to the screen.
import asyncio
q = asyncio.Queue()
async def put():
while True:
await q.put(input()) #Input would be normaly something like client.recv()
await asyncio.sleep(1) #This is neccessarry but I dont understand why
async def get():
while True:
print(await q.get())
def run():
loop = asyncio.get_event_loop()
task1 = loop.create_task(put())
task2 = loop.create_task(get())
loop.run_forever()
run()
This program works as expected, however when one removes the await asyncio.sleep(1)
statement from the put
method, it stops working. I assume because the loop keeps eating up the thread and the message doesn't get pushed through. I don't understand why though because I would think input()
would be a blocking function and the coroutine should thus suspend until a new line is available on the tty.
The second problem is, if I use asyncio.get_event_loop()
in the run()
call, the interpreter warns me that there is no active loop, however, as stated in the documentation this call is deprecated and thus I tried replacing it with asyncio.new_event_loop()
. The programm still works the same, however I get a traceback upon KeyboardInterrupt (which does not happen when calling asyncio.get_event_loop()
)
Task was destroyed but it is pending!
task: <Task pending name='Task-1' coro=<put() running at test.py:10> wait_for=<Future pending cb=[Task.task_wakeup()]>>
Task was destroyed but it is pending!
task: <Task pending name='Task-2' coro=<get() running at test.py:15> wait_for=<Future pending cb=[Task.task_wakeup()]>>
Exception ignored in: <coroutine object get at 0x7f32f51369d0>
Traceback (most recent call last):
File "test.py", line 15, in get
File "/usr/lib64/python3.10/asyncio/queues.py", line 161, in get
File "/usr/lib64/python3.10/asyncio/base_events.py", line 745, in call_soon
File "/usr/lib64/python3.10/asyncio/base_events.py", line 510, in _check_closed
RuntimeError: Event loop is closed
A third variant I tried was to make the run method itself async and call it via the asyncio.run(run())
call.
import asyncio
q = asyncio.Queue()
async def put():
while True:
await q.put(input())
await asyncio.sleep(1)
async def get():
while True:
print(await q.get())
async def run():
loop = asyncio.get_event_loop()
task1 = loop.create_task(put())
task2 = loop.create_task(get())
await task1
asyncio.run(run())
This works just fine as well, however if I replace await task1
with await task2
,
again I get errors when I interrupt the program.
Why is the execution different between these and how is it supposed to be done in the end?
Upvotes: 2
Views: 1393
Reputation: 26941
As stated in the comments, with StreamReader
(your original code) problem #1 will work flawlessly and cause no issue. input()
does not give a chance for aio to switch coroutines, and you can try and limit the queue to a certain length if StreamReader
constantly has data.
For problem #2, during cleanup, Python uses the assigned loop for the current thread. Under the hood, asyncio.run()
and asyncio.get_event_loop()
assigns the loop to the main thread. When it doesn't find a loop, all hell breaks loose.
If you wish to assign it yourself, you can do so like that:
def run():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
task1 = loop.create_task(put())
task2 = loop.create_task(get())
loop.run_forever()
Keep in mind you're still missing some manual cleanup (i.e. shutdown_asyncgens
and shutdown_executor
) but that's for a different topic. Overall, using asyncio.run()
is usually the correct choice.
I'm unable to reproduce problem #3, both await task1
and await task2
work flawlessly.
Upvotes: 1