Reputation: 1128
I'm creating a multiprocessing.Queue
in Python and adding multiprocessing.Process
instances to this Queue
.
I would like to add a function call that is executed after every job
, which checks if a specific task has succeeded. If so, I would like to empty the Queue
and terminate execution.
My Process
class is:
class Worker(multiprocessing.Process):
def __init__(self, queue, check_success=None, directory=None, permit_nonzero=False):
super(Worker, self).__init__()
self.check_success = check_success
self.directory = directory
self.permit_nonzero = permit_nonzero
self.queue = queue
def run(self):
for job in iter(self.queue.get, None):
stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
f_out.write(stdout)
if callable(self.check_success) and self.check_success(job):
# Terminate all remaining jobs here
pass
And my Queue
is setup here:
class LocalJobServer(object):
@staticmethod
def sub(command, check_success=None, directory=None, nproc=1, permit_nonzero=False, time=None, *args, **kwargs):
if check_success and not callable(check_success):
msg = "check_success option requires a callable function/object: {0}".format(check_success)
raise ValueError(msg)
# Create a new queue
queue = multiprocessing.Queue()
# Create workers equivalent to the number of jobs
workers = []
for _ in range(nproc):
wp = Worker(queue, check_success=check_success, directory=directory, permit_nonzero=permit_nonzero)
wp.start()
workers.append(wp)
# Add each command to the queue
for cmd in command:
queue.put(cmd, timeout=time)
# Stop workers from exiting without completion
for _ in range(nproc):
queue.put(None)
for wp in workers:
wp.join()
The function call mbkit.dispatch.cexectools.cexec()
is a wrapper around subprocess.Popen
and returns p.stdout
.
In the Worker
class, I've written the conditional to check if a job succeeded, and tried emptying the remaining jobs in the Queue
using a while
loop, i.e. my Worker.run()
function looked like this:
def run(self):
for job in iter(self.queue.get, None):
stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
f_out.write(stdout)
if callable(self.check_success) and self.check_success(job):
break
while not self.queue.empty():
self.queue.get()
Although this works sometimes, it usually deadlocks and my only option is to Ctrl-C
. I am aware that .empty()
is unreliable, thus my question.
Any advice on how I can implement such an early termination functionality?
Upvotes: 1
Views: 1164
Reputation: 4467
You do not have a deadlock here. It is just linked to the behavior of multiprocessing.Queue
, as the get
method is blocking by default. Thus when you call get
on an empty queue, the call stall, waiting for the next element to be ready. You can see that some of your workers will stall because when you use your loop while not self.queue.empty()
to empty it, you remove all the None
sentinel and some of your workers will block on the empty Queue
, like in this code:
from multiprocessing import Queue
q = Queue()
for e in iter(q.get, None):
print(e)
To be notified when the queue is empty, you need to use non blocking call. You can for instance use q.get_nowait
, or use a timeout in q.get(timeout=1)
. Both throw a multiprocessing.queues.Empty
exception when the queue is empty. So you should replace your Worker
for job in iter(...):
loop by something like:
while not queue.empty():
try:
job = queue.get(timeout=.1)
except multiprocessing.queues.Empty:
continue
# Do stuff with your job
If you do not want to be stuck at any point.
For the synchronization part, I would recommend using a synchronization primitive such as multiprocessing.Condition
or an multiprocessing.Event
. This is cleaner than the Value are they are design for this purpose. Something like this should help
def run(self):
while not queue.empty():
try:
job = queue.get(timeout=.1)
except multiprocessing.queues.Empty:
continue
if self.event.is_set():
continue
stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
f_out.write(stdout)
if callable(self.check_success) and self.check_success(job):
self.event.set()
print("Worker {} terminated cleanly".format(self.name))
with event = multiprocessing.Event()
.
Note that it is also possible to use a multiprocessing.Pool
to get avoid dealing with the queue and the workers. But as you need some synchronization primitive, it might be a bit more complicated to set up. Something like this should work:
def worker(job, success, check_success=None, directory=None, permit_nonzero=False):
if sucess.is_set():
return False
stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
f_out.write(stdout)
if callable(self.check_success) and self.check_success(job):
success.set()
return True
# ......
# In the class LocalJobServer
# .....
def sub(command, check_success=None, directory=None, nproc=1, permit_nonzero=False):
mgr = multiprocessing.Manager()
success = mgr.Event()
pool = multiprocessing.Pool(nproc)
run_args = [(cmd, success, check_success, directory, permit_nonzero)]
result = pool.starmap(worker, run_args)
pool.close()
pool.join()
Note here that I use a Manager as you cannot pass multiprocessing.Event
directly as arguments. You could also use the arguments initializer
and initargs
of the Pool
to initiate global success
event in each worker and avoid relying on the Manager
but it is slightly more complicated.
Upvotes: 1
Reputation: 1128
This might not be the optimal solution, and any other suggestion is much appreciated, but I managed to solve the problem as such:
class Worker(multiprocessing.Process):
"""Simple manual worker class to execute jobs in the queue"""
def __init__(self, queue, success, check_success=None, directory=None, permit_nonzero=False):
super(Worker, self).__init__()
self.check_success = check_success
self.directory = directory
self.permit_nonzero = permit_nonzero
self.success = success
self.queue = queue
def run(self):
"""Method representing the process's activity"""
for job in iter(self.queue.get, None):
if self.success.value:
continue
stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
f_out.write(stdout)
if callable(self.check_success) and self.check_success(job):
self.success.value = int(True)
time.sleep(1)
class LocalJobServer(object):
"""A local server to execute jobs via the multiprocessing module"""
@staticmethod
def sub(command, check_success=None, directory=None, nproc=1, permit_nonzero=False, time=None, *args, **kwargs):
if check_success and not callable(check_success):
msg = "check_success option requires a callable function/object: {0}".format(check_success)
raise ValueError(msg)
# Create a new queue
queue = multiprocessing.Queue()
success = multiprocessing.Value('i', int(False))
# Create workers equivalent to the number of jobs
workers = []
for _ in range(nproc):
wp = Worker(queue, success, check_success=check_success, directory=directory, permit_nonzero=permit_nonzero)
wp.start()
workers.append(wp)
# Add each command to the queue
for cmd in command:
queue.put(cmd)
# Stop workers from exiting without completion
for _ in range(nproc):
queue.put(None)
# Start the workers
for wp in workers:
wp.join(time)
Basically I'm creating a Value
and providing that to each Process
. Once a job is marked as successful, this variable gets updated. Each Process
checks in if self.success.value: continue
whether we have a success and if so, just iterates over the remaining jobs in the Queue
until empty.
The time.sleep(1)
call is required to account for potential syncing delays amongst the processes. This is certainly not the most efficient approach but it works.
Upvotes: 0