Reputation: 15217
Suppose I have a task executed by a multiprocessing.Pool
. How do I allow this task to add new tasks to the Pool
executing it? For example,
def integers(pool, queue, n1, n2):
print ("integers(%d)" % n)
queue.put(n)
pool.apply_async(integers, (pool, queue, n+1)) # crashes; can't pickle `pool`
def start():
pool = multiprocessing.Pool()
queue = multiprocessing.Queue()
integers(pool, queue, 1)
while True:
yield queue.get()
Upvotes: 1
Views: 2270
Reputation: 101989
It's not possible to pickle a Pool
, so you have to find a workaround if you want workers to be able to add tasks.
You can use a particular "sentinel" return value that tells the main program to add new tasks to the Pool
:
while True:
ret_value = queue.get()
if is_sentinel(ret_value):
pool.apply_asynch(*get_arguments(ret_value))
yield ret_value
Where is_sentinel
returns True
whenever the return value requires you to add more jobs to the Pool
and get_arguments
is a function that is able to fetch the arguments to be passed to the Pool
.
The simplest implementation of such functions could be:
def is_sentinel(value):
"""Assume only sentinel values are tuples, or sequences."""
return isinstance(value, tuple)
# or, using duck-typing
try:
len(value)
except TypeError:
return False
else:
return True
def get_arguments(value):
"""Assume that the sentinel value *is* the tuple of arguments
to pass to the Pool.
"""
return value
# or return value[1:] if you want to include a "normal" return value
Where the function passed to apply_asynch
returns a tuple
(or a sequence) only when it wants to add new tasks, and in this case it doesn't supply any return value.
It's pretty simple to add the possibility of providing also a return value(for example: the first element of the tuple could be the "normal" return value).
A different approach could be to use a second queue where the workers can put their requests. At each iteration you can use the get_nowait()
method to see whether the workers requested to add more jobs on the queue.
Your example using the first approach:
def is_sentinel(value):
return isinstance(value, tuple)
def get_arguments(value):
return value
def integers(queue, n1, n2):
print("integers(%d)" % n1)
queue.put(n1)
if n1 < n2:
queue.put((integers, (queue, n1+1, n2)))
def start():
pool = multiprocessing.Pool()
queue = multiprocessing.Queue()
m = 0
n = 100
pool.apply_asynch(integers, (queue, m, n))
while True:
ret_val = queue.get()
if is_sentinel(ret_val):
pool.apply_asynch(*get_arguments(ret_val))
else:
yield ret_val
Your example using the second approach:
# needed for queue.Empty exception
import queue
def integers(out_queue, task_queue, n1, n2):
print("integers(%d)" % n1)
out_queue.put(n1)
if n1 < n2:
task_queue.put((n1+1, n2))
def start():
pool = multiprocessing.Pool()
out_queue = multiprocessing.Queue()
task_queue = multiprocessing.Queue()
task_queue.put((0, 100))
while True:
try:
# may be safer to use a timeout...
m, n = task_queue.get_nowait()
pool.apply_asynch(integers, (out_queue, task_queue, m, n))
except queue.Empty:
# no task to perform
pass
yield out_queue.get()
Upvotes: 2