Reputation: 409
How can I add a new task to a multiprocessing
pool that I initialized in a parent process? This following does not work:
from multiprocessing import Pool
def child_task(x):
# the child task spawns new tasks
results = p.map(grandchild_task, [x])
return results[0]
def grandchild_task(x):
return x
if __name__ == '__main__':
p = Pool(2)
print(p.map(child_task, [0]))
# Result: NameError: name 'p' is not defined
Motivation: I need to parallelize a program which consists of various child tasks, which themselves also have child tasks (i.e., grandchild tasks). Only parallelizing the child tasks OR the grandchild tasks does not utilize all my CPU cores.
In my use-case, I have various child tasks (maybe 1-50) and many grandchild tasks per child task (maybe 100-1000).
Alternatives: If this is not possible using Python's multiprocessing package, I am happy to switch to another library that supports this.
Upvotes: 1
Views: 1247
Reputation: 409
I have looked around more, and found Ray, which addresses this exact use case using nested remote functions.
Upvotes: 0
Reputation: 44213
There is such a thing as a minimal reproducible example and then there is going beyond that to remove so much code as to end up with something that (1) is perhaps too oversimplified with the danger than an answer could miss the mark and (2) couldn't possibly run as shown (you need to enclose the code that creates the Pool and submits the task in a block that is controlled by an if __name__ == '__main__':
statement.
But based on what you have shown, I don't believe a Pool is the solution for you; you should be creating Process instances as they are required. One way to get the results from the Processes is to store them in a shareable, managed dictionary whose key is, for example, the process id of the Process that has created the result.
To expand on your example, the child task is passed two arguments, x
and y
and needs to return as a result x**2 + 'y**2
. The child task will spawn two instances of grandchild task, each one computing the square of its argument. The child task will then combine the return values from these processes using addition:
from multiprocessing import Process, Manager
import os
def child_task(results_dict, x, y):
# the child task spawns new tasks
p1 = Process(target=grandchild_task, args=(results_dict, x))
p1.start()
pid1 = p1.pid
p2 = Process(target=grandchild_task, args=(results_dict, y))
p2.start()
pid2 = p2.pid
p1.join()
p2.join()
pid = os.getpid()
results_dict[pid] = results_dict[pid1] + results_dict[pid2]
def grandchild_task(results_dict, n):
pid = os.getpid()
results_dict[pid] = n * n
def main():
manager = Manager()
results_dict = manager.dict()
p = Process(target=child_task, args=(results_dict, 2, 3))
p.start()
pid = p.pid
p.join()
# results will be stored with key p.pid:
print(results_dict[pid])
if __name__ == '__main__':
main()
Prints:
13
Update
If you really had a situation where, for example, child_task
needed to process N identical calls varying only in its arguments but it had to spawn a sub-process or two, then use a Pool as before but additionally pass a managed dictionary to child_task
to be used for spawning additional Processes (not attempting to use a Pool for this) and retrieving their results.
Update 2
The only way I could figure out for the sub-processes themselves to use pooling is to use the ProcessPoolExecutor
class from concurrent.futures
module. When I attempted to do the same thing with multiprocessing.Pool
, I got an error because we had daemon processes trying to create their own processes. But even here the only way is for each process in the pool to have its own pool of processes. You only have a finite number of processors/cores on your computer, so unless there is a bit of I/O mixed in the processing, you can create all these pools but the processes will be waiting for a chance to run. So, it's not clear what performance gains will be realized. There is also the problem of shutting down all the pools created for the child_task
sub-processes. Normally a ProcessPoolExecutor
instance is created using a with
block and when that block is terminated the pool that was created is cleaned up. But child_task
is invoked repeatedly and clearly cannot use with
block because we don't want constantly to be creating and destroying pools. What I have come here is a bit of a kludge: A third parameter is passed, either True or False, indicating whether child_task
should instigate a shutdown of its pool. The default value for this parameter is False, we don't even bother passing it. After all the actual results have been retrieved and the child_task
processes are now idle, we submit N new tasks with dummy values but with shutdown
set to True. Note that the ProcessPoolExecutor
function map
works quite a bit differently than the same function in the Pool
class (read the docs):
from concurrent.futures import ProcessPoolExecutor
import time
child_executor = None
def child_task(x, y, shutdown=False):
global child_executor
if child_executor is None:
child_executor = ProcessPoolExecutor(max_workers=1)
if shutdown:
if child_executor:
child_executor.shutdown(False)
child_executor = None
time.sleep(.2) # make sure another process in the pool gets the next task
return None
# the child task spawns new task(s)
future = child_executor.submit(grandchild_task, y)
# we can compute one of the results using the current process:
return grandchild_task(x) + future.result()
def grandchild_task(n):
return n * n
def main():
N_WORKERS = 2
with ProcessPoolExecutor(max_workers=N_WORKERS) as executor:
# first call is (1, 2), second call is (3, 4):
results = [result for result in executor.map(child_task, (1, 3), (2, 4))]
print(results)
# force a shutdown
# need N_WORKERS invocations:
[result for result in executor.map(child_task, (0,) * N_WORKERS, (0,) * N_WORKERS, (True,) * N_WORKERS)]
if __name__ == '__main__':
main()
Prints:
[5, 25]
Upvotes: 3
Reputation: 6533
Check this solution:
#!/usr/bin/python
# requires Python version 3.8 or higher
from multiprocessing import Queue, Process
import time
from random import randrange
import os
import psutil
# function to be run by each child process
def square(number):
sleep = randrange(5)
time.sleep(sleep)
print(f'Result is {number * number}, computed by pid {os.getpid()}...sleeping {sleep} secs')
# create a queue where all tasks will be placed
queue = Queue()
# indicate how many number of children you want the system to create to run the tasks
number_of_child_proceses = 5
# put all tasks in the queue above
for task in range(19):
queue.put(task)
# this the main entry/start of the program when you run
def main():
number_of_task = queue.qsize()
print(f'{"_" * 60}\nBatch: {number_of_task // number_of_child_proceses + 1} \n{"_" * 60}')
# don't create more number of children than the number of tasks. Also, in the last round, wait for all child process
# to complete so as to wrap up everything
if number_of_task <= number_of_child_proceses:
processes = [Process(target=square, args=(queue.get(),)) for _ in
range(number_of_task)]
for p in processes:
p.start()
p.join()
else:
processes = [Process(target=square, args=(queue.get(),)) for _ in range(number_of_child_proceses)]
for p in processes:
p.start()
# update count of remaining task
number_of_task = queue.qsize()
# run the program in a loop until no more task remains in the queue
while number_of_task:
current_process = psutil.Process()
children = current_process.children()
# if children process have completed assigned task but there is still more remaining tasks in the queue,
# assign them more tasks
if not len(children) and number_of_task:
print(f'\nAssigned tasks completed... reasigning the remaining {number_of_task} task(s) in the queue\n')
main()
# exit the loop if no more task in the queue to work on
print('\nAll tasks completed!!')
exit()
if __name__ == "__main__":
main()
Upvotes: 0