tmaric
tmaric

Reputation: 5477

How to use a Pool in python.multiprocessing to run pairs of processes one after another?

I would like to use python.multiprocessing.Pool to limit the number of parallel processes , and then run processes such that two of them always run in sequence, one after another. There is no data exchange between the processes.

This is what I came up with

import multiprocessing
import os
  
def setup():
    print(f"Hello, my PID is {os.getpid()}\n")
    
def compute(): 
    print(f"Fellow, my PID is {os.getpid()}\n")
  
if __name__ == "__main__":
  
    # creating a pool object
    p = multiprocessing.Pool(processes=4)
  
    for i in range(10): # 10 > processes !
        # Setup needs to run first  
        result = p.apply_async(setup)
        # and finish before
        result.wait()
        # compute is called.
        p.apply_async(compute)

I first want all (maximally 4) processes to run setup in parallel. Then I want to make sure that a specific setup is completed, before calling its corresponding compute. Is this what it does? The output looks right, but with asynchronous output/execution that's not the right measure.

Alternatively,

if __name__ == "__main__":
  
    # creating a pool object
    p = multiprocessing.Pool(processes=4)
  
    # Run all setups
    # 10 > 4 available processes !
    setup_results = [p.apply_async(setup) for i in range(10)]
        
    # Run all computes
    compute_results = [p.apply_async(compute) for i in range(10)]

The documentation states that

# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print [res.get(timeout=1) for res in multiple_results]

do I have to worry about overscubscribing the machine? How can I make sure that maximally 4 CPUs are used at all times?

Shouldn't I have something like "wait" after the loop-call to setup?

I expect the alternative (loop setup, then loop compute) is faster, because there's no wait, is that true?

Upvotes: 2

Views: 3014

Answers (1)

Booboo
Booboo

Reputation: 44108

First of all, and this is more of an aside, in your original code where you have:

        result = p.apply_async(setup)
        # and finish before
        result.wait()

This can be simplified to:

        return_value = p.apply(setup) # return_value will be None

That is, just use the blocking method apply, which returns the return value from your worker function, setup.

Now for your second alternative:

You are calling non-blocking method apply_async 4 times to perform setup in parallel. But you are not waiting for these 4 tasks to complete before doing likewise with function compute. So even if you have a pool size of 4, when the first setup task completes, the pool process that had been executing this task is now free to start working on the compute tasks that have been submitted. So in general you will end up with a compute task executing while 3 setup tasks are also still executing. This is not what you want. Instead:

if __name__ == "__main__":
    # Use maximum pool size but no greater than number of tasks being submitted:
    #pool_size = min(10, multiprocessing.cpu_count())
    pool_size = 4
  
    # creating a pool object
    p = multiprocessing.Pool(processes=pool_size)
  
    # Run all setups
    setup_results = [p.apply_async(setup) for i in range(10)]
    # Wait for the above tasks to complete:
    for setup_result in setup_results:
        setup_result.get() # Get None return value
        
    # Run all computes
    compute_results = [p.apply_async(compute) for i in range(10)]
    for compute_result in compute_results:
        compute_result.get() # Get None return value

Or use blocking methods. For example:

if __name__ == "__main__":
    # Use maximum pool size but no greater than number of tasks being submitted:  
    #pool_size = min(10, multiprocessing.cpu_count())
    pool_size = 4
  
    # creating a pool object
    p = multiprocessing.Pool(processes=pool_size)
  
    # Run all setups
    setup_results = p.map(setup, range(10))
        
    # Run all computes
    compute_results = p.map(compute, range(10))

I have left the pool size to be 4 in case you need to artificially restrict the number of parallel tasks.

But note that in both examples I have commented-out code that shows how to use all available CPU cores for our pool under the assumption that the worker functions are mostly CPU processing and so there is no point in creating a pool size that is greater, which could be advantageous if there were a lot of I/O or network waiting involved with these function. Of course, if the processing done by these functions was mostly I/O, then you would be probably better off using a multithreading pool whose size is the number of concurrent tasks being submitted with a large upper bound. Also, there is no point in creating a pool size that is greater than the number of processes that will be executing in parallel, which is 10 in this case, regardless of how many cores were available.

Upvotes: 2

Related Questions