Michael
Michael

Reputation: 7377

multiprocessing stop spawning new processes when task finished

Hello I have a multiprocessing program like

#this is pseudocode-ish

def worker(queue, context):
    set_context(context) #set global context within worker
    while queue.qsize() > 0:
        process(queue.get(False))

pool = multiprocessing.Pool(20, worker, (queue, global_context))
pool.close()
pool.join()

The problem is that global context is a very heavy object so spawning each individual process (pickling/unpickling) takes a while. So what I've been finding is that for shorter queues, the whole queue gets processed by the first couple of spawned processes and then the rest of the program is stuck spawning the rest of the processes, which inevitably do nothing because there is nothing left in the queue. E.g. each process takes 1 second to spawn, but the queue is processed in 2 seconds - so first two processes finish the queue in 2-3 seconds and then the rest of the program takes 17 seconds to spawn the rest of the queues.

Is there a way to kill the rest of the processes when the queue is empty? Or a more flexible way to set up the pool number of processes - e.g. only spawn another process when needed?

Thanks

Upvotes: 3

Views: 1321

Answers (1)

Thomas Moreau
Thomas Moreau

Reputation: 4467

There is no way to spawn the process on the fly with multiprocessing.Pool. You would need to modify it by yourself if you want this type of behavior.

For the shutdown, one way is to use multiprocessing.Pool.terminate method. But it will probably wait for all the worker to finish their initialization.

You can also directly kill all the worker when your work is done. I think their is a _pool fields which contains all the worker Process which you can forcefully terminate. Note that this might cause some weird behavior has it is not intended to be handled externally. You have to make sure you clean up all the managing thread correctly, which might be tricky.

Your design choice is quite unusual. You are duplicating the call_queue. Indeed, a Pool is supposed to take care of the communication by itself and you do not need an extra queue. If all the taks are in task_list and need to be processed by process_task, you could do something like

#this is pseudocode-ish

def init(queue, context):
    set_context(context) # set global context within worker

pool = multiprocessing.Pool(20, init, (global_context,))
res = pool.map(process_task, task_list)
pool.terminate()
pool.join()

That way will avoid breaking the Pool setup and is probably more efficient.

Finally, if you intend to re-use your pool several time and your global_context does not change, you could consider using loky. (DISCLAIMER: I am one of the maintainer of this project). This allows you to reuse a pool of workers several times in a program without having to re setup everything. One issue is that there is no initializer as it is following the API of concurrent.futures but the initializer can be done using a multiprocessing.Barrier and submitting max_workers initializer jobs. This would make sure each job for the initializer is ran by one worker and that all workers ran the initializer.

Upvotes: 6

Related Questions