Reputation: 312
I'm running Python 3.9.1
Note: I know there are questions with similar titles. But those questions are embedded in complicated code that makes it hard to understand the problem. This is a bare-bones implementation of the problem which I think others will find easier to digest.
EDIT: I have Pool(processes=64)
in my code. But most other will probably have to change this according to how many cores there are on their computer. And if it takes too long, change listLen
to a smaller number
I'm trying to learn about multiprocessing in order to solve a problem at work. I have a list of arrays with which I need to do a pairwise comparison of the arrays. But for simplicity, I've recreated the gist of the problem with simple integers instead of arrays and an addition function instead of a call to some complicated comparison function. With the code below, I'm running into the titular error
import time
from multiprocessing import Pool
import itertools
import random
def add_nums(a, b):
return(a + b)
if __name__ == "__main__":
listLen = 1000
# Create a list of random numbers to do pairwise additions of
myList = [random.choice(range(1000)) for i in range(listLen)]
# Create a list of all pairwise combinations of the indices of the list
index_combns = [*itertools.combinations(range(len(myList)),2)]
# Do the pairwise operation without multiprocessing
start_time = time.time()
sums_no_mp = [*map(lambda x: add_nums(myList[x[0]], myList[x[1]]), index_combns)]
end_time = time.time() - start_time
print(f"Process took {end_time} seconds with no MP")
# Do the pairwise operations with multiprocessing
start_time = time.time()
pool = Pool(processes=64)
sums_mp = pool.map(lambda x: add_nums(myList[x[0]], myList[x[1]]), index_combns)
end_time = time.time() - start_time
print(f"Process took {end_time} seconds with MP")
pool.close()
pool.join()
Upvotes: 2
Views: 3628
Reputation: 1
Q :
" ... trying to learn about multiprocessing in order to solve a problem at work. "
A :
the single most important piece of experience to learn
is how BIG are the COSTS-of-( process )-INSTANTIATION(s),
all other add-on overhead costs
( still by no means not negligible, the more in growing the scales of the problem )
are details in comparison to this immense & principal one.
Before the answer is read-through and completely understood, here is a Live GUI-interactive simulator of how much we will have to pay to start using more than 1 stream of process-flow orchestration ( costs vary - lower for threads, larger for MPI-based distributed operations, highest for multiprocessing
-processes, as used in Python Interpreter, where N-many copies of the main Python Interpreter process get first copied (RAM-allocated and O/S scheduler spawned - as 2022-Q2 still reports issues if less expensive backends try to avoid this cost, yet at problems with deadlocking on wrong-shared or ill-copied or forgotten to copy some already blocking MUTEX-es and similar internalities - so that even the full-copy is not always safe in 2022 - not having met them in person does not mean these do not still exist, as documented by countless professionals - a story about a pool of sharks is a good place to start from )
a ) pickling lambdas ( and many other SER/DES blockers )
is easy - it is enough to conda install dill
and import dill as pickle
as dill can, for years pickle them - credit to @MikeMcKearns and your code does not need to refactor the use of the plain pickle.dumps()
-call interface. So using pathos.multiprocess
defaults to use dill
internally, and this, for years known multiprocessing
SER/DES weakness gets avoided.
b ) performance killers
- multiprocessing.Pool.map()
is rather an End-to-End performance anti-pattern here - The Costs..., if we start not to neglect them, show, how much CPU-clocks & blocked physical-RAM-I/O transfers are paid for so many process-instantiations ( 60+ ), which finally "occupy" almost all physical CPU-cores, yet leaving thus almost zero-space for indeed high performance numpy
-native multicore-computing of the core-problem ( for which the ultimate performance was expected to be boosted up, wasn't it? )
- just move the p-slider in the simulator to anything less than 100% ( having no [SERIAL]
-part of the problem execution, which is nice in theory, yet never doable in practice, even the program launch is a pure-[SERIAL]
, by design )
- just move the Overhead-slider in the simulator to anything above a plain zero ( expressing a relative add-on cost of spawning a one of NCPUcores processes, as a number of percent, relative to the such [PARALLEL]
-section part number of instructions - mathematically "dense" work has many such "useful"-instructions and may, supposing no other performance killers jump out of the box, may spends some reasonable amount of "add-on" costs, to spawn some amount of concurrent- or parallel-operations ( the actual number depends only on actual economy of costs, not on how many CPU-cores are present, the less on our "wishes" or scholastic or even worse copy/paste-"advice" ). On the contrary, mathematically "shallow" work has almost always "speedups" << 1 ( immense slow-downs ), as there is almost no chance to justify the known add-on costs ( paid on process-instantiations, data SER/xfer/DES moving in (params) and back (results) )
- next move the Overhead-slider in the simulator to the righmost edge == 1
. This shows the case, when the actual process-spawning overhead-( a time lost )-costs are still not more than a just <= 1 %
of all the computing-related instructions next, that are going to be performed for the "useful"-part of the work, that will be computed inside the such spawned process-instance. So even such 1:100 proportion factor ( doing 100x more "useful"-work than the lost CPU-time, burnt for arranging that many copies and making O/S-scheduler orchestrate concurrent execution thereof inside the available system Virtual-Memory ) has already all the warnings visible in the graph of the progression of Speedup-degradation - just play a bit with the Overhead-slider in the simulator, before touching the others...
- avoid a sin of "sharing" ( if performance is the goal ) - again, costs of operating such orchestration among several Python Interpreter processes, now independent, takes additional add-on costs, never justified in gaining performance boosted, as the fight for occupying shared resources ( CPU-cores, physical-RAM-I/O channels ) only devastates CPU-core-cache re-use hit-rates, O/S-scheduler operated process context-switches and all this further downgrades resulting End-to-End performance (which is something we do not want, do we?)
c ) boosting performance
- respect facts about the actual costs of any kind of computing operation
- avoid "shallow"-computing steps,
- maximise what gets so expensively into a set of distributed-processes (if a need remains so),
- avoid all overhead-adding operations (like adding local temporary variables, where inline operations permit to inplace store of partial results)
and
- try go into using the ultra-performant, cache-line friendly & optimised, native numpy
-vectorised multicore & striding-tricks capabilities, not blocked by pre-overloaded CPU-cores by scheduling so many (~60) Python Interpreter process copies, each one trying to call numpy
-code, thus not having any free cores to actually place such high-performance, cache-reuse-friendly vectorised computing onto (there we get-or-loose most of the performance, not in slow-running serial-iterators, not in spawning 60+ process-based full-copies of "__main__
" Python Interpreter, before doing a single piece of the useful work on our great data, expensively RAM-allocated and physically copied 60+ times therein)
- refactoring of the real problem shall never go against a collected knowledge about performance as repeating the things that do not work will not bring any advantage, will it?
- respect your physical platform constraints, ignoring them will degrade your performance
- benchmark, profile, refactor
- benchmark, profile, refactor
- benchmark, profile, refactor
no other magic wand available here
and once already working on the bleeding edge of performance, set gc.disable()
before you spawn the Python Interpreter into N-many replicated copies, not to wait for spontaneous garbage-collections when going for ultimate performance
Upvotes: 1
Reputation: 30240
I'm not exactly sure why (though a thorough read through the multiprocessing docs would probably have an answer), but there's a pickling process involved in python's multiprocessing where child processes are passed certain things. While I would have expected the lambdas to be inherited and not passed via pickle-ing, I guess that's not what's happening.
Following the discussion in the comments, consider something like this approach:
import time
from multiprocessing import Pool
import itertools
import numpy as np
from multiprocessing import shared_memory
def add_mats(a, b):
#time.sleep(0.00001)
return (a + b)
# Helper for mp version
def add_mats_shared(shm_name, array_shape, array_dtype, i1, i2):
shm = shared_memory.SharedMemory(name=shm_name)
stacked = np.ndarray(array_shape, dtype=array_dtype, buffer=shm.buf)
a = stacked[i1]
b = stacked[i2]
result = add_mats(a, b)
shm.close()
return result
if __name__ == "__main__":
class Timer:
def __init__(self):
self.start = None
self.stop = None
self.delta = None
def __enter__(self):
self.start = time.time()
return self
def __exit__(self, *exc_args):
self.stop = time.time()
self.delta = self.stop - self.start
arrays = [np.random.rand(5,5) for _ in range(50)]
index_combns = list(itertools.combinations(range(len(arrays)),2))
# Helper for non-mp version
def add_mats_pair(ij_pair):
i, j = ij_pair
a = arrays[i]
b = arrays[j]
return add_mats(a, b)
with Timer() as t:
# Do the pairwise operation without multiprocessing
sums_no_mp = list(map(add_mats_pair, index_combns))
print(f"Process took {t.delta} seconds with no MP")
with Timer() as t:
# Stack arrays and copy result into shared memory
stacked = np.stack(arrays)
shm = shared_memory.SharedMemory(create=True, size=stacked.nbytes)
shm_arr = np.ndarray(stacked.shape, dtype=stacked.dtype, buffer=shm.buf)
shm_arr[:] = stacked[:]
with Pool(processes=32) as pool:
processes = [pool.apply_async(add_mats_shared, (
shm.name,
stacked.shape,
stacked.dtype,
i,
j,
)) for (i,j) in index_combns]
sums_mp = [p.get() for p in processes]
shm.close()
shm.unlink()
print(f"Process took {t.delta} seconds with MP")
for i in range(len(sums_no_mp)):
assert (sums_no_mp[i] == sums_mp[i]).all()
print("Results match.")
It uses multiprocessing.shared_memory to share a single numpy (N+1)-dimensional array (instead of a list of N-dimensional arrays) between the host process and child processes.
Other things that are different but don't matter:
Pool
is used as a context manager to prevent having to explicitly close and join it.Timer
is a simply context manager to time blocks of code.pool.map
replaced with calls to pool.apply_async
pool.map
would be fine too, but you'd want to build the argument list before the .map
call and unpack it in the worker function, e.g.:
with Pool(processes=32) as pool:
args = [(
shm.name,
stacked.shape,
stacked.dtype,
i,
j,
) for (i,j) in index_combns]
sums_mp = pool.map(add_mats_shared, args)
# and
# Helper for mp version
def add_mats_shared(args):
shm_name, array_shape, array_dtype, i1, i2 = args
shm = shared_memory.SharedMemory(name=shm_name)
....
Upvotes: 1
Reputation: 384
Python cannot pickle lambda functions. Instead you should define the function and pass the function name instead. Here is how you may approach this:
import itertools
import random
import time
from multiprocessing import Pool
def add_nums(a, b):
return a + b
def foo(x):
return add_nums(x[0], x[1])
if __name__ == "__main__":
listLen = 1000
# Create a list of random numbers to do pairwise additions of
myList = [random.choice(range(1000)) for i in range(listLen)]
# Create a list of all pairwise combinations of the indices of the list
index_combns = [
(myList[i[0]], myList[i[1]])
for i in itertools.combinations(range(len(myList)), 2)
]
# Do the pairwise operation without multiprocessing
start_time = time.time()
sums_no_mp = [*map(foo, index_combns)]
end_time = time.time() - start_time
print(f"Process took {end_time} seconds with no MP")
# Do the pairwise operations with multiprocessing
start_time = time.time()
pool = Pool(processes=64)
sums_mp = pool.map(foo, index_combns)
end_time = time.time() - start_time
print(f"Process took {end_time} seconds with MP")
pool.close()
pool.join()
I modified index_combns
to also extract the value from myList
in place, because myList
will not be accessible from foo
and passing in multiple copies of myList
will increase space complexity of your script.
Running this prints:
Process took 0.053926944732666016 seconds with no MP
Process took 0.4799039363861084 seconds with MP
Upvotes: 1