Reputation: 15679
I have a Thread that should wait for tasks to arrive from different multible Threads and execute them until no task is left. If no task is left it should wait again.
I tried it with this class (only relevant code):
from threading import Event, Thread
class TaskExecutor(object):
def __init__(self):
self.event = Event()
self.taskInfos = []
self._running = True
task_thread = Thread(target=self._run_worker_thread)
self._running = True
task_thread.daemon = True
task_thread.start()
def _run_worker_thread(self):
while self.is_running():
if len(self.taskInfos) == 0:
self.event.clear()
self.event.wait()
try:
msg, task = self.taskInfos[0]
del self.taskInfos[0]
if task:
task.execute(msg)
except Exception, e:
logger.error("Error " + str(e))
def schedule_task(self, msg, task):
self.taskInfos.append((msg, task))
self.event.set()
Multiple Threads are calling schedule_task
everytime they like to add a task.
The problem is that I get an error sometimes saying: list index out of range
from the msg, task = self.taskInfos[0]
line. The del self.taskInfos[0]
below is the only one where I delete a task.
How can that happen? I feel like I have to synchronize
everything, but there is no such keyword in python, and reading the docs brought up this pattern.
Upvotes: 0
Views: 69
Reputation: 35891
The following sequence is possible (assume Thread #0 is a consumer and runs your _run_worker_thread
method, and threads Thread #1 and Thread #2 are producers and call schedule_task
method):
schedule_task
and is preempted before set
schedule_task
and reaches set
set
callThe parts in bold are key to understand the race that is possible. Basically, the worker thread can consume all the tasks, spinning with if len(self.taskInfos) == 0
condition being False
before all producers will manage to set
the event after appending to the queue.
Possible solutions include checking the condition again after wait
as suggested in the comments by xndrme, or using the Lock
class, best one probably being the Queue.Queue
class mentioned by Tim Peters in his answer.
Upvotes: 2
Reputation: 70602
This code is pretty hopeless - give up on it and do something sane ;-) What's sane? Use a Queue.Queue
. That's designed to do what you want.
Replace:
self.event = Event()
self.taskInfos = []
with:
self.taskInfos = Queue.Queue()
(of course you have to import Queue
too).
To add a task:
self.taskInfos.put((msg, task))
To get a task:
msg, task = self.taskInfos.get()
That will block until a task is available. There are also options to do a non-blocking .get()
attempt, and to do a .get()
attempt with a timeout (read the docs).
Trying to fix the code you have would be a nightmare. At heart, Event
s are not powerful enough to do what you need for thread safety in this context. In fact, any time you see code doing Event.clear()
, it's probably buggy (subject to races).
Edit: what will go wrong next
If you continue trying to fix this code, this is likely to happen next:
the queue is empty
thread 1 does len(self.taskInfo) == 0, and loses its timeslice
thread 2 does self.taskInfos.append((msg, task))
and does self.event.set()
and loses its timeslice
thread 1 resumes and does self.event.clear()
and does self.event.wait()
Oops! Now thread 1 waits forever, despite that a task is on the queue.
That's why Python supplies Queue.Queue
. You're exceedingly unlikely to get a correct solution using a feeble Event
.
Upvotes: 3