Reputation: 81
As an experiment with Python's new asyncio module, I created the following snippet to process a set of long running actions (jobs) in a background worker.
In an attempt to control the number of simultaneously running jobs, I introduced a semaphore in a with block (line 56). However, with the semaphore in place, it seems that the acquired locks are never released because after completion (the callbacks are executed) the waiting jobs don't start. When I ditch the with block, everything works as expected.
import asyncio
from queue import Queue, Empty
from datetime import datetime
from threading import Thread
class BackgroundWorker(Thread):
def __init__(self):
super().__init__()
self._keep_running = True
self._waiting_coros = Queue()
self._tasks = []
self._loop = None # Loop must be initialized in child thread.
self.limit_simultaneous_processes = asyncio.Semaphore(2)
def stop(self):
self._keep_running = False
def run(self):
self._loop = asyncio.new_event_loop() # Implicit creation of the loop only happens in the main thread.
asyncio.set_event_loop(self._loop) # Since this is a child thread, we need to do in manually.
self._loop.run_until_complete(self.process_coros())
def submit_coro(self, coro, callback=None):
self._waiting_coros.put((coro, callback))
@asyncio.coroutine
def process_coros(self):
while self._keep_running:
try:
while True:
coro, callback = self._waiting_coros.get_nowait()
task = asyncio.async(coro())
if callback:
task.add_done_callback(callback)
self._tasks.append(task)
except Empty as e:
pass
yield from asyncio.sleep(3) # sleep so the other tasks can run
background_worker = BackgroundWorker()
class Job(object):
def __init__(self, idx):
super().__init__()
self._idx = idx
def process(self):
background_worker.submit_coro(self._process, self._process_callback)
@asyncio.coroutine
def _process(self):
with (yield from background_worker.limit_simultaneous_processes):
print("received processing slot %d" % self._idx)
start = datetime.now()
yield from asyncio.sleep(2)
print("processing %d took %s" % (self._idx, str(datetime.now() - start)))
def _process_callback(self, future):
print("callback %d triggered" % self._idx)
def main():
print("starting worker...")
background_worker.start()
for idx in range(10):
download_job = Job(idx)
download_job.process()
command = None
while command != "quit":
command = input("enter 'quit' to stop the program: \n")
print("stopping...")
background_worker.stop()
background_worker.join()
if __name__ == '__main__':
main()
Can anyone help me shed some light on this behavior? Why are isn't the semaphore incremented when the with block is cleared?
Upvotes: 3
Views: 2410
Reputation: 81
I discovered the bug: the semaphore is initialized with an implicit eventloop from the main thread, not the one explicitly set when the thread is started with run()
.
Fixed version:
import asyncio
from queue import Queue, Empty
from datetime import datetime
from threading import Thread
class BackgroundWorker(Thread):
def __init__(self):
super().__init__()
self._keep_running = True
self._waiting_coros = Queue()
self._tasks = []
self._loop = None # Loop must be initialized in child thread.
self.limit_simultaneous_processes = None # Semaphore must be initialized after the loop is set.
def stop(self):
self._keep_running = False
def run(self):
self._loop = asyncio.new_event_loop() # Implicit creation of the loop only happens in the main thread.
asyncio.set_event_loop(self._loop) # Since this is a child thread, we need to do in manually.
self.limit_simultaneous_processes = asyncio.Semaphore(2)
self._loop.run_until_complete(self.process_coros())
def submit_coro(self, coro, callback=None):
self._waiting_coros.put((coro, callback))
@asyncio.coroutine
def process_coros(self):
while self._keep_running:
try:
while True:
coro, callback = self._waiting_coros.get_nowait()
task = asyncio.async(coro())
if callback:
task.add_done_callback(callback)
self._tasks.append(task)
except Empty as e:
pass
yield from asyncio.sleep(3) # sleep so the other tasks can run
background_worker = BackgroundWorker()
class Job(object):
def __init__(self, idx):
super().__init__()
self._idx = idx
def process(self):
background_worker.submit_coro(self._process, self._process_callback)
@asyncio.coroutine
def _process(self):
with (yield from background_worker.limit_simultaneous_processes):
print("received processing slot %d" % self._idx)
start = datetime.now()
yield from asyncio.sleep(2)
print("processing %d took %s" % (self._idx, str(datetime.now() - start)))
def _process_callback(self, future):
print("callback %d triggered" % self._idx)
def main():
print("starting worker...")
background_worker.start()
for idx in range(10):
download_job = Job(idx)
download_job.process()
command = None
while command != "quit":
command = input("enter 'quit' to stop the program: \n")
print("stopping...")
background_worker.stop()
background_worker.join()
if __name__ == '__main__':
main()
Upvotes: 4