Reputation: 1132
I have implemented a WorkerManager based on multiprocessing.Process
and JoinableQueue
. While I try to handle the process exceptions like timeout or un-handle exceptions after proc.join(timeout), and evaluate proc.exitcode to determine how to handle, and then call in_queue.task_done() to notify the job has done with the exception-handle logic. However it need to invoke twice. I have no idea why it should be called twice. Is there anyone could figure it out the reason here.
The whole code snippet:
# -*- coding=utf-8 -*-
import time
import threading
from queue import Empty
from multiprocessing import Event, Process, JoinableQueue, cpu_count, current_process
TIMEOUT = 3
class WorkersManager(object):
def __init__(self, jobs, processes_num):
self._processes_num = processes_num if processes_num else cpu_count()
self._workers_num = processes_num
self._in_queue, self._run_queue, self._out_queue = JoinableQueue(), JoinableQueue(), JoinableQueue()
self._spawned_procs = []
self._total = 0
self._stop_event = Event()
self._jobs_on_procs = {}
self._wk_kwargs = dict(
in_queue=self._in_queue, run_queue=self._run_queue, out_queue=self._out_queue,
stop_event=self._stop_event
)
self._in_stream = [j for j in jobs]
self._out_stream = []
self._total = len(self._in_stream)
def run(self):
# Spawn Worker
worker_processes = [
WorkerProcess(i, **self._wk_kwargs) for i in range(self._processes_num)
]
self._spawned_procs = [
Process(target=process.run, args=tuple())
for process in worker_processes
]
for p in self._spawned_procs:
p.start()
self._serve()
monitor = threading.Thread(target=self._monitor, args=tuple())
monitor.start()
collector = threading.Thread(target=self._collect, args=tuple())
collector.start()
self._join_workers()
# TODO: Terminiate threads
monitor.join(TIMEOUT)
collector.join(TIMEOUT)
self._in_queue.join()
self._out_queue.join()
return self._out_stream
def _join_workers(self):
for p in self._spawned_procs:
p.join(TIMEOUT)
if p.is_alive():
p.terminate()
job = self._jobs_on_procs.get(p.name)
print('Process TIMEOUT: {0} {1}'.format(p.name, job))
result = {
"status": "failed"
}
self._out_queue.put(result)
for _ in range(2):
# NOTE: Call task_done twice
# Guessing:
# 1st time to swtich process?
# 2nd time to notify task has done?
# TODO: figure it out why?
self._in_queue.task_done()
else:
if p.exitcode == 0:
print("{} exit with code:{}".format(p, p.exitcode))
else:
job = self._jobs_on_procs.get(p.name)
if p.exitcode > 0:
print("{} with code:{} {}".format(p, p.exitcode, job))
else:
print("{} been killed with code:{} {}".format(p, p.exitcode, job))
result = {
"status": "failed"
}
self._out_queue.put(result)
for _ in range(2):
# NOTE: Call task_done twice
# Guessing:
# 1st time to swtich process?
# 2nd time to notify task has done?
# TODO: figure it out why?
self._in_queue.task_done()
def _collect(self):
# TODO: Spawn a collector proc
while True:
try:
r = self._out_queue.get()
self._out_stream.append(r)
self._out_queue.task_done()
if len(self._out_stream) >= self._total:
print("Total {} jobs done.".format(len(self._out_stream)))
self._stop_event.set()
break
except Empty:
continue
def _serve(self):
for job in self._in_stream:
self._in_queue.put(job)
for _ in range(self._workers_num):
self._in_queue.put(None)
def _monitor(self):
running = 0
while True:
proc_name, job = self._run_queue.get()
running += 1
self._jobs_on_procs.update({proc_name: job})
self._run_queue.task_done()
if running == self._total:
break
class WorkerProcess(object):
def __init__(self, worker_id, in_queue, run_queue, out_queue, stop_event):
self._worker_id = worker_id
self._in_queue = in_queue
self._run_queue = run_queue
self._out_queue = out_queue
self._stop_event = stop_event
def run(self):
self._work()
print('worker - {} quit'.format(self._worker_id))
def _work(self):
print("worker - {0} start to work".format(self._worker_id))
job = {}
while not self._stop_event.is_set():
try:
job = self._in_queue.get(timeout=.01)
except Empty:
continue
if not job:
self._in_queue.task_done()
break
try:
proc = current_process()
self._run_queue.put((proc.name, job))
r = self._run_job(job)
self._out_queue.put(r)
except Exception as err:
print('Unhandle exception: {0}'.format(err), exc_info=True)
result = {"status": 'failed'}
self._out_queue.put(result)
finally:
self._in_queue.task_done()
def _run_job(self, job):
time.sleep(job)
return {
'status': 'succeed'
}
def main():
jobs = [3, 4, 5, 6, 7]
procs_num = 3
m = WorkersManager(jobs, procs_num)
m.run()
if __name__ == "__main__":
main()
And the issue code as following:
self._out_queue.put(result)
for _ in range(2):
# ISSUE HERE !!!
# NOTE: Call task_done twice
# Guessing:
# 1st time to swtich process?
# 2nd time to notify task has done?
# TODO: figure it out why?
self._in_queue.task_done()
I need to invoke the self._in_queue.task_done()
twice to notify the JoinableQueue the job has done by the exception-handle logic.
I guess whether task_done()
call 1st time was to switch process context? or anything else. according to the testing. the 2nd task_done() has effect.
worker - 0 start to work
worker - 1 start to work
worker - 2 start to work
Process TIMEOUT: Process-1 5
Process TIMEOUT: Process-2 6
Process TIMEOUT: Process-3 7
Total 5 jobs done.
If you call task_done() once, and it will block forever and not to finish.
Upvotes: 0
Views: 590
Reputation: 44148
The problem is that you have a race condition, defined as:
A race condition arises in software when a computer program, to operate properly, depends on the sequence or timing of the program's processes or threads.
In method WorkerProcess._work
, your main loop begins:
while not self._stop_event.is_set():
try:
job = self._in_queue.get(timeout=.01)
except Empty:
continue
if not job:
self._in_queue.task_done()
break
self._stop_event
is being set by the _collect
thread. Depending on where WorkerProcess._work
is in the loop when this occurs, it can exit the loop leaving the None
that has been place on the _in_queue
signifying no more jobs. Clearly, this occurs twice for two processes. It could happen even for 0, 1, 2 or 3 processes.
The fix is to replace while not self._stop_event.is_set():
with while True:
and to just rely on finding None
on the _in_queue
to signify termination. This enables you to remove those extra calls to task_done
for those processes that have completed normally (you actually only needed one extra call per successfully completed process instead of the two you have).
But that is half of the problem. The other half is you have in your code:
def _join_workers(self):
for p in self._spawned_procs:
p.join(TIMEOUT)
...
p.terminate()
Therefore, you are not allowing your workers enough time to deplete the _in_queue
and thus there is the possibility of an arbitrary number of messages being left on it (in the example you have, of course, there would be just the current "job" being processed and the None
sentinel for a total of 2).
But this is the problem in general with the code: it has been over-engineered. As an example, referring back to the first code snippet above. It can be further simplified to:
while True:
job = self._in_queue.get() # blocking get
if not job:
break
Moreover, there is no reason to even be using a JoinableQueue
or Event
instance since the use of a None
sentinel placed on the _in_queue
is sufficient to signify that the worker processes should terminate, especially if you are going to be prematurely terminating the workers. The simplified, working code is:
import time
import threading
from multiprocessing import Process, Queue, cpu_count, current_process
TIMEOUT = 3
class WorkersManager(object):
def __init__(self, jobs, processes_num):
self._processes_num = processes_num if processes_num else cpu_count()
self._workers_num = processes_num
self._in_queue, self._run_queue, self._out_queue = Queue(), Queue(), Queue()
self._spawned_procs = []
self._total = 0
self._jobs_on_procs = {}
self._wk_kwargs = dict(
in_queue=self._in_queue, run_queue=self._run_queue, out_queue=self._out_queue
)
self._in_stream = [j for j in jobs]
self._out_stream = []
self._total = len(self._in_stream)
def run(self):
# Spawn Worker
worker_processes = [
WorkerProcess(i, **self._wk_kwargs) for i in range(self._processes_num)
]
self._spawned_procs = [
Process(target=process.run, args=tuple())
for process in worker_processes
]
for p in self._spawned_procs:
p.start()
self._serve()
monitor = threading.Thread(target=self._monitor, args=tuple())
monitor.start()
collector = threading.Thread(target=self._collect, args=tuple())
collector.start()
self._join_workers()
# TODO: Terminiate threads
monitor.join()
collector.join()
return self._out_stream
def _join_workers(self):
for p in self._spawned_procs:
p.join(TIMEOUT)
if p.is_alive():
p.terminate()
job = self._jobs_on_procs.get(p.name)
print('Process TIMEOUT: {0} {1}'.format(p.name, job))
result = {
"status": "failed"
}
self._out_queue.put(result)
else:
if p.exitcode == 0:
print("{} exit with code:{}".format(p, p.exitcode))
else:
job = self._jobs_on_procs.get(p.name)
if p.exitcode > 0:
print("{} with code:{} {}".format(p, p.exitcode, job))
else:
print("{} been killed with code:{} {}".format(p, p.exitcode, job))
result = {
"status": "failed"
}
self._out_queue.put(result)
def _collect(self):
# TODO: Spawn a collector proc
while True:
r = self._out_queue.get()
self._out_stream.append(r)
if len(self._out_stream) >= self._total:
print("Total {} jobs done.".format(len(self._out_stream)))
break
def _serve(self):
for job in self._in_stream:
self._in_queue.put(job)
for _ in range(self._workers_num):
self._in_queue.put(None)
def _monitor(self):
running = 0
while True:
proc_name, job = self._run_queue.get()
running += 1
self._jobs_on_procs.update({proc_name: job})
if running == self._total:
break
class WorkerProcess(object):
def __init__(self, worker_id, in_queue, run_queue, out_queue):
self._worker_id = worker_id
self._in_queue = in_queue
self._run_queue = run_queue
self._out_queue = out_queue
def run(self):
self._work()
print('worker - {} quit'.format(self._worker_id))
def _work(self):
print("worker - {0} start to work".format(self._worker_id))
job = {}
while True:
job = self._in_queue.get()
if not job:
break
try:
proc = current_process()
self._run_queue.put((proc.name, job))
r = self._run_job(job)
self._out_queue.put(r)
except Exception as err:
print('Unhandle exception: {0}'.format(err), exc_info=True)
result = {"status": 'failed'}
self._out_queue.put(result)
def _run_job(self, job):
time.sleep(job)
return {
'status': 'succeed'
}
def main():
jobs = [3, 4, 5, 6, 7]
procs_num = 3
m = WorkersManager(jobs, procs_num)
m.run()
if __name__ == "__main__":
main()
Prints:
worker - 0 start to work
worker - 1 start to work
worker - 2 start to work
Process TIMEOUT: Process-1 3
Process TIMEOUT: Process-2 6
Process TIMEOUT: Process-3 7
Total 5 jobs done.
You are probably aware of this, but due diligence requires that I mention that there are two excellent classes multiprocessing.Pool
and concurrent.futures.ProcessPoolExecutor
for doing what you want to accomplish. See this for some comparisons.
Further Explanation
What is the point of using a JoinableQueue
, which supports calls to task_done
? Usually, it is so that you can be sure that all of the messages that you have placed on the queue have been taken off the queue and processed and the main process will not be terminating prematurely before that has occurred. But this could not work correctly in the code as you had it because you were giving your processes only TIMEOUT
seconds to process its messages and then terminating the process if it was still alive with the possibility that messages were still left on its queue. This is what forced you to artificially issue extra calls to task_done
just so your calls to join
on the queues in the main process would not hang and why you had to post this question to begin with.
So there are two ways you could have proceeded differently. One way would have allowed you to continue using JoinableQueue
instances and calling join
on these instances to know when to terminate. But (1) you would not then be able to prematurely terminate your message processes and (2) your message processes must handle exceptions correctly so that they do not prematurely terminate without emptying their queues.
The other way is what I proposed, which is much simpler. The main process simply places on the input queue a special sentinel message, in this case None
. This is just a message that cannot be mistaken for an actual message to be processed and instead signifies end of file or, in other words, a signal to the message process that there are no more messages that will be placed on the queue and it may now terminate. Thus, the main process just has to place in addition to the "real" messages to be processed on the queues, the additional sentinel message and then instead of doing a join
call on the message queues (which are now only regular, non-joinable queues), it does join(TIMEOUT)
on each process instance, which you will either find to be no longer alive because it has seen the sentinel and therefore you know that it has processed all of its messages or you can call terminate
on the process if you are willing to leave messages on its input queue.
Of course, to be really sure that processes that terminated on their own really emptied their queue might require you to check their queues to see that they are indeed empty. But I assume that you should be able to code your processes to handle exceptions correctly, at least those that can be handled, so that they do not terminate prematurely and do something "reasonable" with every message.
Upvotes: 3