duckworthd
duckworthd

Reputation: 15217

How to add tasks to a Pool in a task executed by the same Pool?

Suppose I have a task executed by a multiprocessing.Pool. How do I allow this task to add new tasks to the Pool executing it? For example,

def integers(pool, queue, n1, n2):
  print ("integers(%d)" % n)
  queue.put(n)
  pool.apply_async(integers, (pool, queue, n+1))  # crashes; can't pickle `pool`

def start():
  pool  = multiprocessing.Pool()
  queue = multiprocessing.Queue()
  integers(pool, queue, 1)
  while True:
    yield queue.get()

Upvotes: 1

Views: 2270

Answers (1)

Bakuriu
Bakuriu

Reputation: 101989

It's not possible to pickle a Pool, so you have to find a workaround if you want workers to be able to add tasks.

You can use a particular "sentinel" return value that tells the main program to add new tasks to the Pool:

while True:
    ret_value = queue.get()
    if is_sentinel(ret_value):
        pool.apply_asynch(*get_arguments(ret_value))
    yield ret_value

Where is_sentinel returns True whenever the return value requires you to add more jobs to the Pool and get_arguments is a function that is able to fetch the arguments to be passed to the Pool.

The simplest implementation of such functions could be:

def is_sentinel(value):
    """Assume only sentinel values are tuples, or sequences."""
    return isinstance(value, tuple)
    # or, using duck-typing
    try:
        len(value)
    except TypeError:
        return False
    else:
        return True


def get_arguments(value):
    """Assume that the sentinel value *is* the tuple of arguments
    to pass to the Pool.
    """
    return value
    # or return value[1:] if you want to include a "normal" return value

Where the function passed to apply_asynch returns a tuple(or a sequence) only when it wants to add new tasks, and in this case it doesn't supply any return value. It's pretty simple to add the possibility of providing also a return value(for example: the first element of the tuple could be the "normal" return value).

A different approach could be to use a second queue where the workers can put their requests. At each iteration you can use the get_nowait() method to see whether the workers requested to add more jobs on the queue.


Your example using the first approach:

def is_sentinel(value):
    return isinstance(value, tuple)

def get_arguments(value):
    return value


def integers(queue, n1, n2):
    print("integers(%d)" % n1)
    queue.put(n1)
    if n1 < n2:
        queue.put((integers, (queue, n1+1, n2)))

def start():
    pool = multiprocessing.Pool()
    queue = multiprocessing.Queue()
    m = 0
    n = 100
    pool.apply_asynch(integers, (queue, m, n))
    while True:
        ret_val = queue.get()
        if is_sentinel(ret_val):
            pool.apply_asynch(*get_arguments(ret_val))
        else:
            yield ret_val

Your example using the second approach:

# needed for queue.Empty exception
import queue

def integers(out_queue, task_queue, n1, n2):
    print("integers(%d)" % n1)
    out_queue.put(n1)
    if n1 < n2:
        task_queue.put((n1+1, n2))


def start():
    pool = multiprocessing.Pool()
    out_queue = multiprocessing.Queue()
    task_queue = multiprocessing.Queue()
    task_queue.put((0, 100))
    while True:
        try:
            # may be safer to use a timeout...
            m, n = task_queue.get_nowait()
            pool.apply_asynch(integers, (out_queue, task_queue, m, n))
        except queue.Empty:
            # no task to perform
            pass
        yield out_queue.get()

Upvotes: 2

Related Questions