DocZerø
DocZerø

Reputation: 8567

Trying to implement 2 "threads" using `asyncio` module

I've played around with threading before in Python, but decided to give the asyncio module a try, especially since you can cancel a running task, which seemed like a nice detail. However, for some reason, I can't wrap my head around it.

Here's what I wanted to implement (sorry if I'm using incorrect terminology):

I used aiohttp for the webserver.

This is what I have so far:

class aiotest():

    def __init__(self):
        self._dl = None     # downloader future
        self._webapp = None # web server future
        self.init_server()

    def init_server(self):

        print('Setting up web interface')
        app = web.Application()
        app.router.add_route('GET', '/stop', self.stop)
        print('added urls')
        self._webapp = app

    @asyncio.coroutine
    def _downloader(self):
        while True:
            try:
                print('Downloading and verifying file...')
                # Dummy sleep - to be replaced by actual code
                yield from asyncio.sleep(random.randint(3,10))
                # Wait a predefined nr of seconds between downloads
                yield from asyncio.sleep(30)
            except asyncio.CancelledError:
                break

    @asyncio.coroutine
    def _supervisor(self):

        print('Starting downloader')
        self._dl = asyncio.async(self._downloader())

    def start(self):
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self._supervisor())
        loop.close()

    @asyncio.coroutine
    def stop(self):
        print('Received STOP')
        self._dl.cancel()
        return web.Response(body=b"Stopping... ")

This class is called by:

t = aiotest()
t.start()

This doesn't work of course, and I feel that this is a horrible piece of code.

What's unclear to me:

One last, more general question: is asyncio supposed to replace the threading module (in the future)? Or does each have its own application?

I appreciate all the pointers, remarks and clarifications!

Upvotes: 4

Views: 1112

Answers (1)

Mikhail Gerasimov
Mikhail Gerasimov

Reputation: 39606

Why current code is not working:

  • You're running event loop until self._supervisor() is complete. self._supervisor() creates task (it happens immediately) and finishes immediately.

  • You're trying to run event loop until _supervisor complete, but how and when are you going start server? I think event loop should be running until server stopped. _supervisor or other stuff can be added as task (to same event loop). aiohttp already has function to start server and event loop - web.run_app, but we can do it manually.

Your questions:

  1. Your server will run until you stop it. You can start/stop different coroutines while your server working.

  2. You need only one event loop for different coroutines.

  3. I think you don't need supervisor.

  4. More general question: asyncio helps you to run different functions parallel in single thread in single process. That's why asyncio is so cool and fast. Some of your sync code with threads you can rewrite using asyncio and it's coroutines. Moreover: asyncio can interact with threads and processes. It can be useful in case you still need threads and processes: here's example.

Useful notes:

  • It's better to use term coroutine instead of thread while we talk about asyncio coroutines that are not threads
  • If you use Python 3.5, you can use async/await syntax instead of coroutine/yield from

I rewrote your code to show you idea. How to check it: run program, see console, open http://localhost:8080/stop, see console, open http://localhost:8080/start, see console, type CTRL+C.

import asyncio
import random
from contextlib import suppress

from aiohttp import web


class aiotest():
    def __init__(self):
        self._webapp = None
        self._d_task = None
        self.init_server()

    # SERVER:
    def init_server(self):
        app = web.Application()
        app.router.add_route('GET', '/start', self.start)
        app.router.add_route('GET', '/stop', self.stop)
        app.router.add_route('GET', '/kill_server', self.kill_server)
        self._webapp = app

    def run_server(self):
        # Create server:
        loop = asyncio.get_event_loop()
        handler = self._webapp.make_handler()
        f = loop.create_server(handler, '0.0.0.0', 8080)
        srv = loop.run_until_complete(f)
        try:
            # Start downloader at server start:
            asyncio.async(self.start(None))  # I'm using controllers here and below to be short,
                                             # but it's better to split controller and start func
            # Start server:
            loop.run_forever()
        except KeyboardInterrupt:
            pass
        finally:
            # Stop downloader when server stopped:
            loop.run_until_complete(self.stop(None))
            # Cleanup resources:
            srv.close()
            loop.run_until_complete(srv.wait_closed())
            loop.run_until_complete(self._webapp.shutdown())
            loop.run_until_complete(handler.finish_connections(60.0))
            loop.run_until_complete(self._webapp.cleanup())
        loop.close()

    @asyncio.coroutine
    def kill_server(self, request):
        print('Server killing...')
        loop = asyncio.get_event_loop()
        loop.stop()
        return web.Response(body=b"Server killed")

    # DOWNLOADER
    @asyncio.coroutine
    def start(self, request):
        if self._d_task is None:
            print('Downloader starting...')
            self._d_task = asyncio.async(self._downloader())
            return web.Response(body=b"Downloader started")
        else:
            return web.Response(body=b"Downloader already started")

    @asyncio.coroutine
    def stop(self, request):
        if (self._d_task is not None) and (not self._d_task.cancelled()):
            print('Downloader stopping...')
            self._d_task.cancel()            
            # cancel() just say task it should be cancelled
            # to able task handle CancelledError await for it
            with suppress(asyncio.CancelledError):
                yield from self._d_task
            self._d_task = None
            return web.Response(body=b"Downloader stopped")
        else:
            return web.Response(body=b"Downloader already stopped or stopping")

    @asyncio.coroutine
    def _downloader(self):
        while True:
            print('Downloading and verifying file...')
            # Dummy sleep - to be replaced by actual code
            yield from asyncio.sleep(random.randint(1, 2))
            # Wait a predefined nr of seconds between downloads
            yield from asyncio.sleep(1)


if __name__ == '__main__':
    t = aiotest()
    t.run_server()

Upvotes: 4

Related Questions