Reputation: 8567
I've played around with threading before in Python, but decided to give the asyncio
module a try, especially since you can cancel a running task, which seemed like a nice detail. However, for some reason, I can't wrap my head around it.
Here's what I wanted to implement (sorry if I'm using incorrect terminology):
downloader
thread that downloads the same file every x seconds, checks its hash against the previous download and saves it if it's different.webserver
thread that runs in the background, allowing control (pause, list, stop) of the downloader
thread.I used aiohttp
for the webserver.
This is what I have so far:
class aiotest():
def __init__(self):
self._dl = None # downloader future
self._webapp = None # web server future
self.init_server()
def init_server(self):
print('Setting up web interface')
app = web.Application()
app.router.add_route('GET', '/stop', self.stop)
print('added urls')
self._webapp = app
@asyncio.coroutine
def _downloader(self):
while True:
try:
print('Downloading and verifying file...')
# Dummy sleep - to be replaced by actual code
yield from asyncio.sleep(random.randint(3,10))
# Wait a predefined nr of seconds between downloads
yield from asyncio.sleep(30)
except asyncio.CancelledError:
break
@asyncio.coroutine
def _supervisor(self):
print('Starting downloader')
self._dl = asyncio.async(self._downloader())
def start(self):
loop = asyncio.get_event_loop()
loop.run_until_complete(self._supervisor())
loop.close()
@asyncio.coroutine
def stop(self):
print('Received STOP')
self._dl.cancel()
return web.Response(body=b"Stopping... ")
This class is called by:
t = aiotest()
t.start()
This doesn't work of course, and I feel that this is a horrible piece of code.
What's unclear to me:
downloader
in the stop()
method, but how would I go about stopping the webserver (e.g. in a shutdown()
method)?downloader
need a new event loop, or can I use the loop returned by asyncio.get_event_loop()
?supervisor
for what I'm trying to implement? This seems so clunky. And how do I get supervisor
to keep running instead of ending after a single execution as it does now?One last, more general question: is asyncio
supposed to replace the threading
module (in the future)? Or does each have its own application?
I appreciate all the pointers, remarks and clarifications!
Upvotes: 4
Views: 1112
Reputation: 39606
Why current code is not working:
You're running event loop until self._supervisor()
is complete. self._supervisor()
creates task (it happens immediately) and finishes immediately.
You're trying to run event loop until _supervisor
complete, but how and when are you going start server? I think event loop should be running until server stopped. _supervisor
or other stuff can be added as task (to same event loop). aiohttp
already has function to start server and event loop - web.run_app
, but we can do it manually.
Your questions:
Your server will run until you stop it. You can start/stop different coroutines while your server working.
You need only one event loop for different coroutines.
I think you don't need supervisor
.
More general question: asyncio
helps you to run different
functions parallel in single thread in single process. That's why
asyncio is so cool and fast. Some of your sync code with threads you
can rewrite using asyncio and it's coroutines. Moreover: asyncio can
interact with threads and processes.
It can be useful in case you still need threads and processes: here's example.
Useful notes:
coroutine
instead of thread
while we talk about asyncio coroutines that are not threadsasync
/await
syntax
instead of coroutine
/yield from
I rewrote your code to show you idea. How to check it: run program, see console, open http://localhost:8080/stop
, see console, open http://localhost:8080/start
, see console, type CTRL+C.
import asyncio
import random
from contextlib import suppress
from aiohttp import web
class aiotest():
def __init__(self):
self._webapp = None
self._d_task = None
self.init_server()
# SERVER:
def init_server(self):
app = web.Application()
app.router.add_route('GET', '/start', self.start)
app.router.add_route('GET', '/stop', self.stop)
app.router.add_route('GET', '/kill_server', self.kill_server)
self._webapp = app
def run_server(self):
# Create server:
loop = asyncio.get_event_loop()
handler = self._webapp.make_handler()
f = loop.create_server(handler, '0.0.0.0', 8080)
srv = loop.run_until_complete(f)
try:
# Start downloader at server start:
asyncio.async(self.start(None)) # I'm using controllers here and below to be short,
# but it's better to split controller and start func
# Start server:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
# Stop downloader when server stopped:
loop.run_until_complete(self.stop(None))
# Cleanup resources:
srv.close()
loop.run_until_complete(srv.wait_closed())
loop.run_until_complete(self._webapp.shutdown())
loop.run_until_complete(handler.finish_connections(60.0))
loop.run_until_complete(self._webapp.cleanup())
loop.close()
@asyncio.coroutine
def kill_server(self, request):
print('Server killing...')
loop = asyncio.get_event_loop()
loop.stop()
return web.Response(body=b"Server killed")
# DOWNLOADER
@asyncio.coroutine
def start(self, request):
if self._d_task is None:
print('Downloader starting...')
self._d_task = asyncio.async(self._downloader())
return web.Response(body=b"Downloader started")
else:
return web.Response(body=b"Downloader already started")
@asyncio.coroutine
def stop(self, request):
if (self._d_task is not None) and (not self._d_task.cancelled()):
print('Downloader stopping...')
self._d_task.cancel()
# cancel() just say task it should be cancelled
# to able task handle CancelledError await for it
with suppress(asyncio.CancelledError):
yield from self._d_task
self._d_task = None
return web.Response(body=b"Downloader stopped")
else:
return web.Response(body=b"Downloader already stopped or stopping")
@asyncio.coroutine
def _downloader(self):
while True:
print('Downloading and verifying file...')
# Dummy sleep - to be replaced by actual code
yield from asyncio.sleep(random.randint(1, 2))
# Wait a predefined nr of seconds between downloads
yield from asyncio.sleep(1)
if __name__ == '__main__':
t = aiotest()
t.run_server()
Upvotes: 4