Reputation: 18246
I have a thread that start (and eventually stops) a asyncio loop like so:
class Ook(Thread):
[…]
def run(self):
try:
self._logger.debug("Asyncio loop runs forever.")
self.loop.run_forever()
except Exception as ex:
# We do want to see ALL unhandled exceptions here.
self._logger.error("Exception raised: %s", ex)
self._logger.exception(ex)
finally:
# Stop the loop!
self._logger.warn('Closing asyncio event loop.')
self.loop.run_until_complete(self.loop.shutdown_asyncgens())
self.loop.close()
def stop(self):
self._logger.info("Thread has been asked to stop!")
if self.loop.is_running():
self._logger.debug("Asked running asyncio loop to stop.")
for task in asyncio.Task.all_tasks():
self.loop.call_soon_threadsafe(task.cancel)
self.loop.call_soon_threadsafe(self.loop.stop)
A silly (?) unit test to check that works is
@pytest.mark.asyncio
async def test_start_and_stop_thread():
sut = Ook()
sut.start()
if sut.isAlive():
sut.stop()
sut.join()
assert not sut.isAlive()
assert not sut.loop.is_running()
This does not work because of raised asyncio.CancelledError
… Catching those anywhere in the stop
method does not seem to help.
If I run the test code not marked with @pytest.mark.asyncio
, I get a message saying Task was destroyed but it is pending!
.
What am I doing wrong?
Upvotes: 0
Views: 3525
Reputation: 503
We have several issues here.
the Task.cancel() raises a asyncio.CancelledError() inside the couroutine. You should add a "try/exec CancelledError" in your coroutines to handle that exception.
Another way could be suppressing the CancelledError exception in the def stop:
from asyncio import CancelledError
from contextlib import suppress
def stop(self):
self._logger.info("Thread has been asked to stop!")
if self.loop.is_running():
self._logger.debug("Asked running asyncio loop to stop.")
self.loop.call_soon_threadsafe(self.loop.stop)
for task in asyncio.Task.all_tasks():
task.cancel()
with suppress(CancelledError):
loop.run_until_complete(task)
remember to close also all asynchronous generators with
loop.run_until_complete(loop.shutdown_asyncgens())
Upvotes: 2