Reputation: 894
Using multiprocessing.pool
I can split an input list for a single function to be processed in parallel along multiple CPUs. Like this:
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=4)
results = pool.map(f, range(100))
pool.close()
pool.join()
However, this does not allow to run different functions on different processors. If I want to do something like this, in parallel / simultaneously:
foo1(args1) --> Processor1
foo2(args2) --> Processor2
How can this be done?
Edit: After Darkonaut remarks, I do not care about specifically assigning foo1 to Processor number 1. It can be any processor as chosen by the OS. I am just interested in running independent functions in different/ parallel Processes. So rather:
foo1(args1) --> process1
foo2(args2) --> process2
Upvotes: 1
Views: 2374
Reputation: 1552
I usually find it easiest to use the concurrent.futures module for concurrency. You can achieve the same with multiprocessing
, but concurrent.futures
has (IMO) a much nicer interface.
Your example would then be:
from concurrent.futures import ProcessPoolExecutor
def foo1(x):
return x * x
def foo2(x):
return x * x * x
if __name__ == '__main__':
with ProcessPoolExecutor(2) as executor:
# these return immediately and are executed in parallel, on separate processes
future_1 = executor.submit(foo1, 1)
future_2 = executor.submit(foo2, 2)
# get results / re-raise exceptions that were thrown in workers
result_1 = future_1.result() # contains foo1(1)
result_2 = future_2.result() # contains foo2(2)
If you have many inputs, it is better to use executor.map
with the chunksize
argument instead:
from concurrent.futures import ProcessPoolExecutor
def foo1(x):
return x * x
def foo2(x):
return x * x * x
if __name__ == '__main__':
with ProcessPoolExecutor(4) as executor:
# these return immediately and are executed in parallel, on separate processes
future_1 = executor.map(foo1, range(10000), chunksize=100)
future_2 = executor.map(foo2, range(10000), chunksize=100)
# executor.map returns an iterator which we have to consume to get the results
result_1 = list(future_1) # contains [foo1(x) for x in range(10000)]
result_2 = list(future_2) # contains [foo2(x) for x in range(10000)]
Note that the optimal values for chunksize
, the number of processes, and whether process-based concurrency actually leads to increased performance depends on many factors:
The runtime of foo1
/ foo2
. If they are extremely cheap (as in this example), the communication overhead between processes might dominate the total runtime.
Spawning a process takes time, so the code inside with ProcessPoolExecutor
needs to run long enough for this to amortize.
The actual number of physical processors in the machine you are running on.
Whether your application is IO bound or compute bound.
Whether the functions you use in foo
are already parallelized (such as some np.linalg
solvers, or scikit-learn
estimators).
Upvotes: 2