Reputation: 3343
This should be my third and final question regarding my attempts to increase performance on some statistical analysis that I am doing with python. I have 2 versions of my code (single core vs multiprocessing), I was expecting to gain performance by using multiple cores as I expect my code to uncompress/unpack quite a few binary strings , sadly I noticed that the performance actually decreased by using multiple cores.
I am wondering if anyone has a possible explanation for what I observe (scroll down to the April 16th update for more information)?
The key part of program is the function numpy_array (+ decode in multiprocessing), code snippet below (full code accessible via pastebin, further below):
def numpy_array(data, peaks):
rt_counter=0
for x in peaks:
if rt_counter %(len(peaks)/20) == 0:
update_progress()
peak_counter=0
data_buff=base64.b64decode(x)
buff_size=len(data_buff)/4
unpack_format=">%dL" % buff_size
index=0
for y in struct.unpack(unpack_format,data_buff):
buff1=struct.pack("I",y)
buff2=struct.unpack("f",buff1)[0]
if (index % 2 == 0):
data[rt_counter][1][peak_counter][0]=float(buff2)
else:
data[rt_counter][1][peak_counter][1]=float(buff2)
peak_counter+=1
index+=1
rt_counter+=1
The multiprocessing version performs this with a set of functions, I will display the key 2 below:
def tonumpyarray(mp_arr):
return np.frombuffer(mp_arr.get_obj())
def numpy_array(shared_arr,peaks):
processors=mp.cpu_count()
with contextlib.closing(mp.Pool(processes=processors,
initializer=pool_init,
initargs=(shared_arr, ))) as pool:
chunk_size=int(len(peaks)/processors)
map_parameters=[]
for i in range(processors):
counter = i*chunk_size
chunk=peaks[i*chunk_size:(i+1)*chunk_size]
map_parameters.append((chunk, counter))
pool.map(decode,map_parameters)
def decode ((chunk, counter)):
data=tonumpyarray(shared_arr).view(
[('f0','<f4'), ('f1','<f4',(250000,2))])
for x in chunk:
peak_counter=0
data_buff=base64.b64decode(x)
buff_size=len(data_buff)/4
unpack_format=">%dL" % buff_size
index=0
for y in struct.unpack(unpack_format,data_buff):
buff1=struct.pack("I",y)
buff2=struct.unpack("f",buff1)[0]
#with shared_arr.get_lock():
if (index % 2 == 0):
data[counter][1][peak_counter][0]=float(buff2)
else:
data[counter][1][peak_counter][1]=float(buff2)
peak_counter+=1
index+=1
counter+=1
Full program codes can be accessed via these pastebin links
Pastebin for single core version
Pastebin for multiprocessing version
The performance that I am observing with a file containing 239 timepoints and ~ 180k measurement pairs per timepoint is ~2.5m for single core and ~3.5 for multiprocessing.
PS: The two previous questions (of my first ever attempts at paralellization):
-- April 16th --
I have been profiling my program with the cProfile library (having cProfile.run('main()')
in the __main__
, which shows that there is 1 step that is slowing everything down:
ncalls tottime percall cumtime percall filename:lineno(function)
23 85.859 3.733 85.859 3.733 {method 'acquire' of 'thread.lock' objects}
The thing that I do not understand here is that thread.lock
objects are used in threading
(to my understanding) but should not be used in multiprocessing as each core should run a single thread (besides having it's own locking mechanism), so how is it that this occurs and why does a single call take 3.7 seconds?
Upvotes: 11
Views: 12417
Reputation: 191
As far as the last part of your question, the Python docs basically say that multiprocessing.lock is a clone of threading.lock. Acquire calls on locks can take a long time because if the lock is already acquired, it will block until the lock is released. This can become a problem when multiple processes are competing for access to the same data, like in your code. Because I can't view your pastebin, I can only guess as to exactly what's going on, but most likely, you're processes are acquiring the lock for long periods of time which stops other processes from running, even if there is plenty of free CPU time. This shouldn't be affected by the GIL as that should only constrain multithreaded applications, not multiprocessed ones. So, how to fix this? My guess is that you have some sort of lock protecting your shared array that is staying locked while the process is doing intensive calculations that take a relatively long time, therefore barring access for other processes, which are subsequently blocking on their lock.acquire() calls. Assuming you have enough RAM, I strongly endorse the answer that suggests storing multiple copies of the array in each process's address space. However, just note that passing large data structures through map can cause unexpected bottlenecks, as it requires picking and depickling.
Upvotes: 0
Reputation: 1135
Your Pastebins is empty.
The problem is that multiprocessing uses fork if its available (instead of spawning a new python proccess). Forked process share same env(file descriptors for example). May be it has some locks among them.
Here is some frustration about that: Multiprocessing or os.fork, os.exec?
Upvotes: 0
Reputation: 40884
Shared data is a known case of slowdowns due to synchronization.
Can you split your data among processes, or give each process an independent copy? Then your processes would not need to synchronize anything up until the moment when all calculations are done.
Then I'd let the master process join the output of all worker processors into one coherent set.
The approach may take extra RAM, but RAM is cheap nowadays.
If you ask, I'm also puzzled by 3700 ms per thread lock acquisition. OTOH profiling may be mistaken about special calls like this.
Upvotes: 2