Uno
Uno

Reputation: 61

Convert a simple multithreaded program with asyncio

I am still pretty new to Python asyncio, so I am trying to convert a simple problem I solved using multithreading, to using asyncio.

I made an example of what I want to achieve. Each MiniBot instance can start at random times (those time.sleep() calls in main represent instantiations at unpredictable times.)

I am expecting each MiniBot to run in parallel if they start before others have finished.

All good with MultiThreading, but when I translated the problem with async coroutines, I can't get them to start together.

I could use gather, but this would require to have all the task at the beginning, which I don't. Any suggestions?

Thanks

Oh yes, I am using Python 3.6

Multithreaded version

import threading
import time

class MiniBot(object):
    def __init__(self, _id:str):
        self._id = _id
        self.alive = True
        self.start_time = time.time()
        self.th = threading.Thread(target=self.run)

        self.internal_state = 0


    def _foo(self):
        self.internal_state += 1


    def run(self):
        while self.alive:
            self._foo()

            if time.time() - self.start_time > 4:
                print(f"Killing minibot: {self._id}")
                print(f"Var is: {self.internal_state}")
                self.stop()
            time.sleep(0.1)


    def start(self):
        print(f"Starting Minibot {self._id}")
        self.th.start()


    def stop(self):
        self.alive = False


if __name__ == "__main__":
    # MiniBots activities start at random times but should coexist
    MiniBot('a').start()
    time.sleep(2)
    MiniBot('b').start()
    time.sleep(1.5)
    MiniBot('c').start()

Output:

Starting Minibot a
Starting Minibot b
Starting Minibot c
Killing minibot: a
Var is: 40
Killing minibot: b
Var is: 40
Killing minibot: c
Var is: 40

Async version (Not behaving as I hoped)

import asyncio
import time

class MiniBot(object):
    def __init__(self, _id:str):
        self._id = _id
        self.alive = True
        self.last_event = time.time()

        self.internal_state = 0


    async def _foo(self):
        self.internal_state += 1
        asyncio.sleep(2)


    async def run(self):
        while self.alive:
            await self._foo()

            if time.time() - self.last_event > 4:
                print(f"Killing minibot: {self._id}")
                print(f"Var is: {self.internal_state}")
                self.stop()
            asyncio.sleep(0.1)


    def start(self):
        print(f"Starting Minibot {self._id}")
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        try:
            loop.run_until_complete(self.run())
        finally:
            loop.run_until_complete(loop.shutdown_asyncgens())
            loop.close()


    def stop(self):
        self.alive = False


if __name__ == "__main__":
    # MiniBots activities start at random times but should coexist
    MiniBot('a').start()
    time.sleep(2)
    MiniBot('b').start()
    time.sleep(1.5)
    MiniBot('c').start()

Output:

Starting Minibot a
Killing minibot: a
Var is: 2839119
Starting Minibot b
Killing minibot: b
Var is: 2820634
Starting Minibot c
Killing minibot: c
Var is: 2804579

Upvotes: 3

Views: 1241

Answers (1)

user4815162342
user4815162342

Reputation: 155046

start cannot call run_until_complete because run_until_complete runs a coroutine through to the end, whereas you need multiple coroutines running in parallel. The asyncio equivalent of creating and starting a thread is asyncio.create_task(), so start() should call that, and return to the caller, like in the threaded version.

For example:

import asyncio, time

class MiniBot:
    def __init__(self, _id):
        self._id = _id
        self.alive = True
        self.start_time = time.time()
        self.internal_state = 0

    def _foo(self):
        self.internal_state += 1

    async def run(self):
        while self.alive:
            self._foo()
            if time.time() - self.start_time > 4:
                print(f"Killing minibot: {self._id}")
                print(f"Var is: {self.internal_state}")
                self.stop()
            await asyncio.sleep(0.1)

    def start(self):
        print(f"Starting Minibot {self._id}")
        return asyncio.create_task(self.run())

    def stop(self):
        self.alive = False

async def main():
    taska = MiniBot('a').start()
    await asyncio.sleep(2)
    taskb = MiniBot('b').start()
    await asyncio.sleep(1.5)
    taskc = MiniBot('c').start()
    await asyncio.gather(taska, taskb, taskc)

if __name__ == "__main__":
    #asyncio.run(main())
    asyncio.get_event_loop().run_until_complete(main())

Don't let the call to gather() throw you off: gather simply gives back control to the event loop and returns when all the provided tasks/futures have finished. You can replace it with something like await asyncio.sleep(10) and have the same effect (in this example). And if you have an unpredictable number of futures, there are other other ways to signal the end-condition.

Also note that you need to await asyncio.sleep(), otherwise it has no effect.

Upvotes: 1

Related Questions