ImapUkua
ImapUkua

Reputation: 81

Limiting simultaneously running asyncio coroutines with semaphores in a background thread

As an experiment with Python's new asyncio module, I created the following snippet to process a set of long running actions (jobs) in a background worker.

In an attempt to control the number of simultaneously running jobs, I introduced a semaphore in a with block (line 56). However, with the semaphore in place, it seems that the acquired locks are never released because after completion (the callbacks are executed) the waiting jobs don't start. When I ditch the with block, everything works as expected.

import asyncio

from queue import Queue, Empty
from datetime import datetime
from threading import Thread


class BackgroundWorker(Thread):
    def __init__(self):
        super().__init__()
        self._keep_running = True
        self._waiting_coros = Queue()
        self._tasks = []
        self._loop = None    # Loop must be initialized in child thread.
        self.limit_simultaneous_processes = asyncio.Semaphore(2)

    def stop(self):
        self._keep_running = False

    def run(self):
        self._loop = asyncio.new_event_loop()       # Implicit creation of the loop only happens in the main thread.
        asyncio.set_event_loop(self._loop)          # Since this is a child thread, we need to do in manually.
        self._loop.run_until_complete(self.process_coros())

    def submit_coro(self, coro, callback=None):
        self._waiting_coros.put((coro, callback))

    @asyncio.coroutine
    def process_coros(self):
        while self._keep_running:
            try:
                while True:
                    coro, callback = self._waiting_coros.get_nowait()
                    task = asyncio.async(coro())
                    if callback:
                        task.add_done_callback(callback)
                    self._tasks.append(task)
            except Empty as e:
                pass
            yield from asyncio.sleep(3)     # sleep so the other tasks can run


background_worker = BackgroundWorker()


class Job(object):
    def __init__(self, idx):
        super().__init__()
        self._idx = idx

    def process(self):
        background_worker.submit_coro(self._process, self._process_callback)

    @asyncio.coroutine
    def _process(self):
        with (yield from background_worker.limit_simultaneous_processes):
            print("received processing slot %d" % self._idx)
            start = datetime.now()
            yield from asyncio.sleep(2)
            print("processing %d took %s" % (self._idx, str(datetime.now() - start)))

    def _process_callback(self, future):
        print("callback %d triggered" % self._idx)


def main():
    print("starting worker...")
    background_worker.start()

    for idx in range(10):
        download_job = Job(idx)
        download_job.process()

    command = None
    while command != "quit":
        command = input("enter 'quit' to stop the program: \n")

    print("stopping...")
    background_worker.stop()
    background_worker.join()


if __name__ == '__main__':
    main()

Can anyone help me shed some light on this behavior? Why are isn't the semaphore incremented when the with block is cleared?

Upvotes: 3

Views: 2410

Answers (1)

ImapUkua
ImapUkua

Reputation: 81

I discovered the bug: the semaphore is initialized with an implicit eventloop from the main thread, not the one explicitly set when the thread is started with run().

Fixed version:

import asyncio

from queue import Queue, Empty
from datetime import datetime
from threading import Thread


class BackgroundWorker(Thread):
    def __init__(self):
        super().__init__()
        self._keep_running = True
        self._waiting_coros = Queue()
        self._tasks = []
        self._loop = None                           # Loop must be initialized in child thread.
        self.limit_simultaneous_processes = None    # Semaphore must be initialized after the loop is set.

    def stop(self):
        self._keep_running = False

    def run(self):
        self._loop = asyncio.new_event_loop()       # Implicit creation of the loop only happens in the main thread.
        asyncio.set_event_loop(self._loop)          # Since this is a child thread, we need to do in manually.
        self.limit_simultaneous_processes = asyncio.Semaphore(2)
        self._loop.run_until_complete(self.process_coros())

    def submit_coro(self, coro, callback=None):
        self._waiting_coros.put((coro, callback))

    @asyncio.coroutine
    def process_coros(self):
        while self._keep_running:
            try:
                while True:
                    coro, callback = self._waiting_coros.get_nowait()
                    task = asyncio.async(coro())
                    if callback:
                        task.add_done_callback(callback)
                    self._tasks.append(task)
            except Empty as e:
                pass
            yield from asyncio.sleep(3)     # sleep so the other tasks can run


background_worker = BackgroundWorker()


class Job(object):
    def __init__(self, idx):
        super().__init__()
        self._idx = idx

    def process(self):
        background_worker.submit_coro(self._process, self._process_callback)

    @asyncio.coroutine
    def _process(self):
        with (yield from background_worker.limit_simultaneous_processes):
            print("received processing slot %d" % self._idx)
            start = datetime.now()
            yield from asyncio.sleep(2)
            print("processing %d took %s" % (self._idx, str(datetime.now() - start)))

    def _process_callback(self, future):
        print("callback %d triggered" % self._idx)


def main():
    print("starting worker...")
    background_worker.start()

    for idx in range(10):
        download_job = Job(idx)
        download_job.process()

    command = None
    while command != "quit":
        command = input("enter 'quit' to stop the program: \n")

    print("stopping...")
    background_worker.stop()
    background_worker.join()


if __name__ == '__main__':
    main()

Upvotes: 4

Related Questions