Reputation: 7377
Hello I have a multiprocessing program like
#this is pseudocode-ish
def worker(queue, context):
set_context(context) #set global context within worker
while queue.qsize() > 0:
process(queue.get(False))
pool = multiprocessing.Pool(20, worker, (queue, global_context))
pool.close()
pool.join()
The problem is that global context
is a very heavy object so spawning each individual process (pickling/unpickling) takes a while. So what I've been finding is that for shorter queues, the whole queue gets processed by the first couple of spawned processes and then the rest of the program is stuck spawning the rest of the processes, which inevitably do nothing because there is nothing left in the queue. E.g. each process takes 1 second to spawn, but the queue is processed in 2 seconds - so first two processes finish the queue in 2-3 seconds and then the rest of the program takes 17 seconds to spawn the rest of the queues.
Is there a way to kill the rest of the processes when the queue is empty? Or a more flexible way to set up the pool number of processes - e.g. only spawn another process when needed?
Thanks
Upvotes: 3
Views: 1321
Reputation: 4467
There is no way to spawn the process on the fly with multiprocessing.Pool
. You would need to modify it by yourself if you want this type of behavior.
For the shutdown, one way is to use multiprocessing.Pool.terminate
method. But it will probably wait for all the worker
to finish their initialization.
You can also directly kill all the worker when your work is done. I think their is a _pool
fields which contains all the worker Process
which you can forcefully terminate. Note that this might cause some weird behavior has it is not intended to be handled externally. You have to make sure you clean up all the managing thread
correctly, which might be tricky.
Your design choice is quite unusual. You are duplicating the call_queue
. Indeed, a Pool
is supposed to take care of the communication by itself and you do not need an extra queue
. If all the taks are in task_list
and need to be processed by process_task
, you could do something like
#this is pseudocode-ish
def init(queue, context):
set_context(context) # set global context within worker
pool = multiprocessing.Pool(20, init, (global_context,))
res = pool.map(process_task, task_list)
pool.terminate()
pool.join()
That way will avoid breaking the Pool
setup and is probably more efficient.
Finally, if you intend to re-use your pool several time and your global_context does not change, you could consider using loky
. (DISCLAIMER: I am one of the maintainer of this project).
This allows you to reuse a pool of workers several times in a program without having to re setup everything.
One issue is that there is no initializer
as it is following the API of concurrent.futures
but the initializer
can be done using a multiprocessing.Barrier
and submitting max_workers
initializer jobs. This would make sure each job for the initializer
is ran by one worker and that all workers ran the initializer
.
Upvotes: 6