Vishal
Vishal

Reputation: 3296

multiprocessing - Cancel remaining jobs in a pool without destroying the Pool

I'm using map_async to create a pool of 4 workers. And giving it a list of image files to process [Set 1].
At times, I need to cancel the processing in between, so that I can instead get a different set of files processed [Set 2].

So an example situation is, I gave map_async 1000 files to process. And then want to cancel the processing of remaining jobs after about 200 files have been processed.
Additionally, I want to do this cancellation without destroying/terminating the pool. Is this possible?

I do not want to terminate the pool, because recreating the pool is a slow process on Windows (because it uses 'spawn', instead of 'fork'). And I need to use this same pool for processing a different set of image files [Set 2]..

# Putting job_set1 through processing. It may consist of 1000 images
cpu = multiprocessing.cpu_count()
pool = Pool(processes=cpu)
result = pool.map_async(job_set1, thumb_ts_list, chunksize=chunksize)

Now in between, I need to cancel the processing on this set 1. And move onto a different set (waiting for all 1000 images to complete processing is not an option, but I can wait for the current image being processed to finish)

<Somehow cancel processing of job_set1>
result = pool.map_async(job_set2, thumb_ts_list, chunksize=chunksize)

Upvotes: 3

Views: 1199

Answers (2)

Davis Herring
Davis Herring

Reputation: 39818

It's time for the fundamental theorem of software engineering: while multiprocessing.Pool doesn't supply cancellation as a feature, we can add it by having a Pool read from a carefully crafted iterable. It's not enough, however, to have a generator that yields values from a list but stops short on some signal, because the Pool eagerly drains any generator given to it. So we need a very carefully crafted iterable.

A lazy Pool

The generic tool we need is a way to construct tasks for a Pool only when a worker becomes available (or at most one task ahead, in case constructing them takes significant time). The basic idea is to slow down the thread collecting work for the Pool with a semaphore upped only when a task is finished. (We know such a thread exists from the observable behavior of imap_unordered.)

import multiprocessing
from threading import Semaphore

size=multiprocessing.cpu_count()  # or whatever Pool size to use

# How many workers are waiting for work?  Add one to buffer one task.
work=Semaphore(size)

def feed0(it):
  it=iter(it)
  try:
    while True:
      # Don't ask the iterable until we have a customer, in case better
      # instructions become available:
      work.acquire()
      yield next(it)
  except StopIteration: pass
  work.release()
def feed(p,f,it):
  import sys,traceback
  iu=p.imap_unordered(f,feed0(it))
  while True:
    try: x=next(iu)
    except StopIteration: return
    except Exception: traceback.print_exception(*sys.exc_info())
    work.release()
    yield x

The try in feed prevents failures in the children from breaking the semaphore's count, but note that it does not protect against failures in the parent.

A cancelable iterator

Now that we have real-time control over the Pool input, making whatever scheduling policy is straightforward. For example, here's something like itertools.chain but with the ability to asynchronously discard any remaining elements from one of the input sequences:

import collections,queue

class Cancel:
  closed=False
  cur=()
  def __init__(self): self.data=queue.Queue() # of deques
  def add(self,d):
    d=collections.deque(d)
    self.data.put(d)
    return d
  def __iter__(self):
    while True:
      try: yield self.cur.popleft()
      except IndexError:
        self.cur=self.data.get()
        if self.cur is None: break
  @staticmethod
  def cancel(d): d.clear()
  def close(self): self.data.put(None)

This is thread-safe (in CPython at least) despite the lack of locking because operations like deque.clear are atomic with respect to Python inspection (and we don't separately check whether self.cur is empty).

Usage

Making one of these looks like

pool=mp.Pool(size)
can=Cancel()
many=can.add(range(1000))
few=can.add(["some","words"])
can.close()
for x in feed(pool,assess_happiness,can):
  if happy_with(x): can.cancel(many)  # straight onto few, then out

where of course the adds and close could themselves be in the loop.

Upvotes: 1

gz.
gz.

Reputation: 6711

The multiprocessing module doesn't seem to have the concept of cancellation. You can use the concurrent.futures.ProcessPoolExecutor wrapper and cancel the pending futures when you have enough results.

Here's an example that picks out 10 JPEGs from a set of paths, and cancels pending futures while leaving the process pool usable afterwards:

import concurrent.futures


def interesting_path(path):
    """Gives path if is a JPEG else ``None``."""
    with open(path, 'rb') as f:
        if f.read(3) == b'\xff\xd8\xff':
            return path
        return None


def find_interesting(paths, count=10):
     """Yields count from paths which are 'interesting' by multiprocess task."""
    with concurrent.futures.ProcessPoolExecutor() as pool:
        futures = {pool.submit(interesting_path, p) for p in paths}
        print ('Started {}'.format(len(futures)))
        for future in concurrent.futures.as_completed(futures):
            res = future.result()
            futures.remove(future)
            if res is not None:
                yield res
                count -= 1
                if count == 0:
                    break
        cancelled = 0
        for future in futures:
            cancelled += future.cancel()
        print ('Cancelled {}'.format(cancelled))
        concurrent.futures.wait(futures)
        # Can still use pool here for more processing as needed

Note that picking how to break up work into futures is still tricky, a bigger set is more overhead but can also mean less wasted work. This can also be adapted to Python 3.6 async syntax easily enough.

Upvotes: 0

Related Questions