Reputation: 1546
The code down below is a contrived example that simulates an actual problem I have that uses multiprocessing to speed up the code. The code is run on Windows 10 64-bit OS
, python 3.7.5
, and ipython 7.9.0
the transformation functions(these functions will be used to transform arrays in main()
)
from itertools import product
from functools import partial
from numba import njit, prange
import multiprocessing as mp
import numpy as np
@njit(parallel= True)
def transform_array_c(data, n):
ar_len= len(data)
sec_max1= np.empty(ar_len, dtype = data.dtype)
sec_max2= np.empty(ar_len, dtype = data.dtype)
for i in prange(n-1):
sec_max1[i]= np.nan
for sec in prange(ar_len//n):
s2_max= data[n*sec+ n-1]
s1_max= data[n*sec+ n]
for i in range(n-1,-1,-1):
if data[n*sec+i] > s2_max:
s2_max= data[n*sec+i]
sec_max2[n*sec+i]= s2_max
sec_max1[n*sec+ n-1]= sec_max2[n*sec]
for i in range(n-1):
if n*sec+n+i < ar_len:
if data[n*sec+n+i] > s1_max:
s1_max= data[n*sec+n+i]
sec_max1[n*sec+n+i]= max(s1_max, sec_max2[n*sec+i+1])
else:
break
return sec_max1
@njit(error_model= 'numpy', cache= True)
def rt_mean_sq_dev(array1, array2, n):
msd_temp = np.empty(array1.shape[0])
K = array2[n-1]
rs_x= array1[0] - K
rs_xsq = rs_x *rs_x
msd_temp[0] = np.nan
for i in range(1,n):
rs_x += array1[i] - K
rs_xsq += np.square(array1[i] - K)
msd_temp[i] = np.nan
y_i = array2[n-1] - K
msd_temp[n-1] = np.sqrt(max(y_i*y_i + (rs_xsq - 2*y_i*rs_x)/n, 0))
for i in range(n, array1.shape[0]):
rs_x = array1[i] - array1[i-n]+ rs_x
rs_xsq = np.square(array1[i] - K) - np.square(array1[i-n] - K) + rs_xsq
y_i = array2[i] - K
msd_temp[i] = np.sqrt(max(y_i*y_i + (rs_xsq - 2*y_i*rs_x)/n, 0))
return msd_temp
@njit(cache= True)
def transform_array_a(data, n):
result = np.empty(data.shape[0], dtype= data.dtype)
alpharev = 1. - 2 / (n + 1)
alpharev_exp = alpharev
e = data[0]
w = 1.
if n == 2: result[0] = e
else:result[0] = np.nan
for i in range(1, data.shape[0]):
w += alpharev_exp
e = e*alpharev + data[i]
if i > n -3:result[i] = e / w
else:result[i] = np.nan
if alpharev_exp > 3e-307:alpharev_exp*= alpharev
else:alpharev_exp=0.
return result
The multiprocessing part
def func(tup, data): #<-------------the function to be run among all
a_temp= a[tup[2][0]]
idx1 = a_temp > a[tup[2][1]]
idx2= a_temp < b[(tup[2][1], tup[1][1])]
c_final = c[tup[0][1]][idx1 | idx2]
data_final= data[idx1 | idx2]
return (tup[0][0], tup[1][0], *tup[2]), c_final[-1] - data_final[-1]
def setup(a_dict, b_dict, c_dict): #initialize the shared dictionaries
global a,b,c
a,b,c = a_dict, b_dict, c_dict
def main(a_arr, b_arr, c_arr, common_len):
np.random.seed(0)
data_array= np.random.normal(loc= 24004, scale=500, size= common_len)
a_size = a_arr[-1] + 1
b_size = len(b_arr)
c_size = len(c_arr)
loop_combo = product(enumerate(c_arr),
enumerate(b_arr),
(n_tup for n_tup in product(np.arange(1,a_arr[-1]), a_arr) if n_tup[1] > n_tup[0])
)
result = np.zeros((c_size, b_size, a_size -1 ,a_size), dtype = np.float32)
###################################################
#This part simulates the heavy-computation in the actual problem
a= {}
b= {}
c= {}
for i in range(1, a_arr[-1]+1):
a[i]= transform_array_a(data_array, i)
if i in a_arr:
for j in b_arr:
b[(i,j)]= rt_mean_sq_dev(data_array, a[i], i)/data_array *j
for i in c_arr:
c[i]= transform_array_c(data_array, i)
###################################################
with mp.Pool(processes= mp.cpu_count() - 1,
initializer= setup,
initargs= [a,b,c]
) as pool:
mp_res= pool.imap_unordered(partial(func, data= data_array),
loop_combo
)
for item in mp_res:
result[item[0]] =item[1]
return result
if __name__ == '__main__':
mp.freeze_support()
a_arr= np.arange(2,44,2)
b_arr= np.arange(0.4,0.8, 0.20)
c_arr= np.arange(2,42,10)
common_len= 440000
final_res= main(a_arr, b_arr, c_arr, common_len)
For performance reasons, multiple shared "read only" dictionaries are used among all processes to reduce the redundant computations(in the actual problem, the total computation time is reduced by 40% after using shared dictionaries among all the processes). However, The ram usage becomes absurdly higher after using shared dictionaries in my actual problem; memory usage in my 6C/12T Windows computer goes from (8.2GB peak, 5.0GB idle) to (23.9GB peak, 5.0GB idle), a little too high of a cost to pay in order to gain 40% speed up.
Is the high ram usage unavoidable when using multiple shared data among processes is a must? What can be done to my code in order to make it as fast as possible while using as low memory as possible?
Thank you in advance
Note: I tried using imap_unordered()
instead of map
because I heard it is supposed to reduce the memory usage when the input iterable is large, but I honestly can't see an improvement in the ram usage. Maybe I have done something wrong here?
EDIT: Due to the feedback in the answers, I have already changed the heavy computation part of the code such that it looks less dummy and resembles the computation in the actual problem.
Upvotes: 1
Views: 2744
Reputation: 1
High Memory Usage when manipulating shared dictionaries in python
multiprocessing
run in Windows
It is fair to demystify a bit the problem, before we move into details - there are no shared dictionaries in the original code, the less they get manipulated ( yes, each of the a,b,c
did get "assigned" to a reference to the dict_a, dict_b, dict_c
yet none of them is shared, but just get replicated as the multiprocessing
does in Windows-class O/S-es. No writes "into" dict
-s ( just non-destructive reads-from either of their replicas )
Similarly, the np.memmap()
-s are possible to put some part of the originally proposed data onto disk-space ( at a cost of doing so + bearing some ( latency-masked ) random-reads latency of ~ 10 [ms]
instead of ~ 0.5 [ns]
if smart-aligned vectorised memory-patterns were designed into the performance hot-spot ) yet no dramatic change-of-paradigm ought be expected here, as the "external iterator" almost avoids any smart-aligned cache re-uses
Q : What can be done to my code in order to make it as fast as possible while using as low memory as possible?
The first sin was in using an 8B
-int64
to store one plain Bbit ( no Qbits here yet ~ All salutes to Burnaby Quantum R&D Teams )
for i in c_arr: # ~~ np.arange( 2, 42, 10 )
np.random.seed( i ) # ~ yields a deterministic state
c[i] = np.random.poisson( size = common_len ) # ~ 440.000 int64-s with {0|1}
This took 6
(processes) x 440000
x 8B ~ 0.021 GB
"smuggled" in all copies of dictionary c
, whereas each and every such value is deterministically known and could be generated ALAP inside a respective target process, by just knowing the value of i
( indeed no need to pre-generate and many-times replicate ~ 0.021 GB
of data )
So far, the Windows-class O/S lack an os.fork()
and thus do a python full-copy ( yes, RAM ..., yes, TIME ) of as many replicated python-interpreter sessions ( plus importing the main module ) as was requested, in multiprocessing
for process-based separation ( doing that for avoiding a GIL-lock ordered, pure-[SERIAL]
, code execution )
The best next step - refactor the code, so as to minimise a "shallow" ( and expensive ) use of the 6-processes but "externally"-commanded by a central iterator ( the loop_combo
"dictator" with ~ 18522 items to repeat the call to a "remotely-dispatched" func( tup, data )
so as to fetch a simple "DMA-tuple"-( (x,y,z), value )
to store one value
into a central process result
-float32
-array ).
Try to increase the computing "density" - so try to re-factor the code by a divide-and-conquer manner ( i.e., that each of the mp.pool
-processes computes in one smooth block some remarkably sized, dedicated sub-space of the parameter-space covered ( here iteratively "from ouside" ) and may easily reduce the returned blocks of results. Performance will only improve by doing this ( best without any form of expensive sharing ).
This re-factoring will avoid parameter pickle
/unpickle
-costs ( add-on overheads - both the one-time ( in passing the unique parameter-set values ) and the repetitive ( in about a ~ 18522-times executed repetitive memory-allocation, buildup and pickle
/unpickle
-costs of an np.arange( 440000 )
due to a poor call-signature design / engineering )
All these steps will improve your processing efficiency and reduce the there unnecessary RAM-allocations.
Upvotes: 2