fsimkovic
fsimkovic

Reputation: 1128

Deadlock in Python's multiprocessing upon early termination

I'm creating a multiprocessing.Queue in Python and adding multiprocessing.Process instances to this Queue.

I would like to add a function call that is executed after every job, which checks if a specific task has succeeded. If so, I would like to empty the Queue and terminate execution.

My Process class is:

class Worker(multiprocessing.Process):

    def __init__(self, queue, check_success=None, directory=None, permit_nonzero=False):
        super(Worker, self).__init__()
        self.check_success = check_success
        self.directory = directory
        self.permit_nonzero = permit_nonzero
        self.queue = queue

    def run(self):
        for job in iter(self.queue.get, None):
            stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
            with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
                f_out.write(stdout)
            if callable(self.check_success) and self.check_success(job):
                # Terminate all remaining jobs here
                pass

And my Queue is setup here:

class LocalJobServer(object):

    @staticmethod
    def sub(command, check_success=None, directory=None, nproc=1, permit_nonzero=False, time=None, *args, **kwargs):
        if check_success and not callable(check_success):
            msg = "check_success option requires a callable function/object: {0}".format(check_success)
            raise ValueError(msg)

        # Create a new queue
        queue = multiprocessing.Queue()
        # Create workers equivalent to the number of jobs
        workers = []
        for _ in range(nproc):
            wp = Worker(queue, check_success=check_success, directory=directory, permit_nonzero=permit_nonzero)
            wp.start()
            workers.append(wp)
        # Add each command to the queue
        for cmd in command:
            queue.put(cmd, timeout=time)
        # Stop workers from exiting without completion
        for _ in range(nproc):
            queue.put(None)
        for wp in workers:
            wp.join()

The function call mbkit.dispatch.cexectools.cexec() is a wrapper around subprocess.Popen and returns p.stdout.

In the Worker class, I've written the conditional to check if a job succeeded, and tried emptying the remaining jobs in the Queue using a while loop, i.e. my Worker.run() function looked like this:

def run(self):
    for job in iter(self.queue.get, None):
        stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
        with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
            f_out.write(stdout)
        if callable(self.check_success) and self.check_success(job):
            break
    while not self.queue.empty():
        self.queue.get()

Although this works sometimes, it usually deadlocks and my only option is to Ctrl-C. I am aware that .empty() is unreliable, thus my question.

Any advice on how I can implement such an early termination functionality?

Upvotes: 1

Views: 1164

Answers (2)

Thomas Moreau
Thomas Moreau

Reputation: 4467

You do not have a deadlock here. It is just linked to the behavior of multiprocessing.Queue, as the get method is blocking by default. Thus when you call get on an empty queue, the call stall, waiting for the next element to be ready. You can see that some of your workers will stall because when you use your loop while not self.queue.empty() to empty it, you remove all the None sentinel and some of your workers will block on the empty Queue, like in this code:

from multiprocessing import Queue
q = Queue()
for e in iter(q.get, None):
    print(e)

To be notified when the queue is empty, you need to use non blocking call. You can for instance use q.get_nowait, or use a timeout in q.get(timeout=1). Both throw a multiprocessing.queues.Empty exception when the queue is empty. So you should replace your Worker for job in iter(...): loop by something like:

while not queue.empty():
    try:
        job = queue.get(timeout=.1)
    except multiprocessing.queues.Empty:
        continue
    # Do stuff with your job

If you do not want to be stuck at any point.

For the synchronization part, I would recommend using a synchronization primitive such as multiprocessing.Condition or an multiprocessing.Event. This is cleaner than the Value are they are design for this purpose. Something like this should help

def run(self):
    while not queue.empty():
        try:
            job = queue.get(timeout=.1)
        except multiprocessing.queues.Empty:
            continue
        if self.event.is_set():
            continue
        stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
        with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
            f_out.write(stdout)
        if callable(self.check_success) and self.check_success(job):
            self.event.set()
    print("Worker {} terminated cleanly".format(self.name))

with event = multiprocessing.Event().

Note that it is also possible to use a multiprocessing.Pool to get avoid dealing with the queue and the workers. But as you need some synchronization primitive, it might be a bit more complicated to set up. Something like this should work:

 def worker(job, success, check_success=None, directory=None, permit_nonzero=False):
      if sucess.is_set():
          return False
      stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
      with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
          f_out.write(stdout)
      if callable(self.check_success) and self.check_success(job):
          success.set()
      return True

# ......
# In the class LocalJobServer
# .....

def sub(command, check_success=None, directory=None, nproc=1, permit_nonzero=False):

    mgr = multiprocessing.Manager()
    success = mgr.Event()

    pool = multiprocessing.Pool(nproc)
    run_args = [(cmd, success, check_success, directory, permit_nonzero)]
    result = pool.starmap(worker, run_args)

    pool.close()
    pool.join()

Note here that I use a Manager as you cannot pass multiprocessing.Event directly as arguments. You could also use the arguments initializer and initargs of the Pool to initiate global success event in each worker and avoid relying on the Manager but it is slightly more complicated.

Upvotes: 1

fsimkovic
fsimkovic

Reputation: 1128

This might not be the optimal solution, and any other suggestion is much appreciated, but I managed to solve the problem as such:

class Worker(multiprocessing.Process):
    """Simple manual worker class to execute jobs in the queue"""

    def __init__(self, queue, success, check_success=None, directory=None, permit_nonzero=False):
        super(Worker, self).__init__()
        self.check_success = check_success
        self.directory = directory
        self.permit_nonzero = permit_nonzero
        self.success = success
        self.queue = queue

    def run(self):
        """Method representing the process's activity"""
        for job in iter(self.queue.get, None):
            if self.success.value:
                continue
            stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
            with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
                f_out.write(stdout)
            if callable(self.check_success) and self.check_success(job):
                self.success.value = int(True)
            time.sleep(1)


class LocalJobServer(object):
    """A local server to execute jobs via the multiprocessing module"""

    @staticmethod
    def sub(command, check_success=None, directory=None, nproc=1, permit_nonzero=False, time=None, *args, **kwargs):
        if check_success and not callable(check_success):
            msg = "check_success option requires a callable function/object: {0}".format(check_success)
            raise ValueError(msg)

        # Create a new queue
        queue = multiprocessing.Queue()
        success = multiprocessing.Value('i', int(False))
        # Create workers equivalent to the number of jobs
        workers = []
        for _ in range(nproc):
            wp = Worker(queue, success, check_success=check_success, directory=directory, permit_nonzero=permit_nonzero)
            wp.start()
            workers.append(wp)
        # Add each command to the queue
        for cmd in command:
            queue.put(cmd)
        # Stop workers from exiting without completion
        for _ in range(nproc):
            queue.put(None)
        # Start the workers
        for wp in workers:
            wp.join(time)

Basically I'm creating a Value and providing that to each Process. Once a job is marked as successful, this variable gets updated. Each Process checks in if self.success.value: continue whether we have a success and if so, just iterates over the remaining jobs in the Queue until empty.

The time.sleep(1) call is required to account for potential syncing delays amongst the processes. This is certainly not the most efficient approach but it works.

Upvotes: 0

Related Questions