mathguy
mathguy

Reputation: 1546

High Memory Usage when python multiprocessing run in Windows

The code down below is a contrived example that simulates an actual problem I have that uses multiprocessing to speed up the code. The code is run on Windows 10 64-bit OS, python 3.7.5, and ipython 7.9.0


the transformation functions(these functions will be used to transform arrays in main())

from itertools import product
from functools import partial

from numba import njit, prange
import multiprocessing as mp
import numpy as np

@njit(parallel= True)
def transform_array_c(data, n):

    ar_len= len(data)

    sec_max1= np.empty(ar_len, dtype = data.dtype)
    sec_max2= np.empty(ar_len, dtype = data.dtype)

    for i in prange(n-1):
        sec_max1[i]= np.nan

    for sec in prange(ar_len//n):
        s2_max= data[n*sec+ n-1]
        s1_max= data[n*sec+ n]

        for i in range(n-1,-1,-1):
            if data[n*sec+i] > s2_max:
                s2_max= data[n*sec+i]
            sec_max2[n*sec+i]= s2_max

        sec_max1[n*sec+ n-1]= sec_max2[n*sec]

        for i in range(n-1):
            if n*sec+n+i < ar_len:
                if data[n*sec+n+i] > s1_max:
                    s1_max= data[n*sec+n+i]
                sec_max1[n*sec+n+i]= max(s1_max, sec_max2[n*sec+i+1])

            else:
                break

    return sec_max1  

@njit(error_model= 'numpy', cache= True)
def rt_mean_sq_dev(array1, array2, n):
    msd_temp = np.empty(array1.shape[0])

    K = array2[n-1]

    rs_x= array1[0] - K
    rs_xsq = rs_x *rs_x

    msd_temp[0] = np.nan

    for i in range(1,n):
        rs_x += array1[i] - K
        rs_xsq += np.square(array1[i] - K)
        msd_temp[i] = np.nan

    y_i = array2[n-1] - K
    msd_temp[n-1] = np.sqrt(max(y_i*y_i + (rs_xsq - 2*y_i*rs_x)/n, 0))

    for i in range(n, array1.shape[0]):
        rs_x = array1[i] - array1[i-n]+ rs_x
        rs_xsq = np.square(array1[i] - K) - np.square(array1[i-n] - K) + rs_xsq
        y_i = array2[i] - K

        msd_temp[i] = np.sqrt(max(y_i*y_i + (rs_xsq - 2*y_i*rs_x)/n, 0))

    return msd_temp 

@njit(cache= True)
def transform_array_a(data, n):
    result = np.empty(data.shape[0], dtype= data.dtype)
    alpharev = 1. - 2 / (n + 1)
    alpharev_exp = alpharev

    e = data[0]
    w = 1.

    if n == 2: result[0] = e
    else:result[0] = np.nan

    for i in range(1, data.shape[0]):
        w += alpharev_exp
        e = e*alpharev + data[i]

        if i > n -3:result[i] = e / w
        else:result[i] = np.nan

        if alpharev_exp > 3e-307:alpharev_exp*= alpharev
        else:alpharev_exp=0.

    return result

The multiprocessing part

def func(tup, data):    #<-------------the function to be run among all 
    a_temp= a[tup[2][0]]

    idx1 = a_temp > a[tup[2][1]]
    idx2= a_temp < b[(tup[2][1], tup[1][1])]

    c_final = c[tup[0][1]][idx1 | idx2]
    data_final= data[idx1 | idx2]

    return (tup[0][0], tup[1][0], *tup[2]), c_final[-1] - data_final[-1]

def setup(a_dict, b_dict, c_dict):    #initialize the shared dictionaries
    global a,b,c
    a,b,c = a_dict, b_dict, c_dict

def main(a_arr, b_arr, c_arr, common_len):
    np.random.seed(0)
    data_array= np.random.normal(loc= 24004, scale=500, size= common_len)

    a_size = a_arr[-1] + 1
    b_size = len(b_arr)
    c_size = len(c_arr)

    loop_combo = product(enumerate(c_arr),
                         enumerate(b_arr),
                         (n_tup for n_tup in product(np.arange(1,a_arr[-1]), a_arr) if n_tup[1] > n_tup[0])
                         )
    result = np.zeros((c_size, b_size, a_size -1 ,a_size), dtype = np.float32) 

    ###################################################
    #This part simulates the heavy-computation in the actual problem

    a= {}
    b= {}
    c= {}

    for i in range(1, a_arr[-1]+1):

        a[i]= transform_array_a(data_array, i)
        if i in a_arr:
            for j in b_arr:
                b[(i,j)]= rt_mean_sq_dev(data_array, a[i], i)/data_array *j


    for i in c_arr:
        c[i]= transform_array_c(data_array, i)

    ###################################################    
    with mp.Pool(processes= mp.cpu_count() - 1,
                 initializer= setup,
                 initargs= [a,b,c]
                 ) as pool:
        mp_res= pool.imap_unordered(partial(func, data= data_array),
                                    loop_combo
                                    )

        for item in mp_res:
            result[item[0]] =item[1]


    return result


if __name__ == '__main__':
    mp.freeze_support()

    a_arr= np.arange(2,44,2)
    b_arr= np.arange(0.4,0.8, 0.20)
    c_arr= np.arange(2,42,10)
    common_len= 440000

    final_res= main(a_arr, b_arr, c_arr, common_len)

For performance reasons, multiple shared "read only" dictionaries are used among all processes to reduce the redundant computations(in the actual problem, the total computation time is reduced by 40% after using shared dictionaries among all the processes). However, The ram usage becomes absurdly higher after using shared dictionaries in my actual problem; memory usage in my 6C/12T Windows computer goes from (8.2GB peak, 5.0GB idle) to (23.9GB peak, 5.0GB idle), a little too high of a cost to pay in order to gain 40% speed up.

Is the high ram usage unavoidable when using multiple shared data among processes is a must? What can be done to my code in order to make it as fast as possible while using as low memory as possible?

Thank you in advance


Note: I tried using imap_unordered() instead of map because I heard it is supposed to reduce the memory usage when the input iterable is large, but I honestly can't see an improvement in the ram usage. Maybe I have done something wrong here?


EDIT: Due to the feedback in the answers, I have already changed the heavy computation part of the code such that it looks less dummy and resembles the computation in the actual problem.

Upvotes: 1

Views: 2744

Answers (1)

user3666197
user3666197

Reputation: 1

High Memory Usage when manipulating shared dictionaries in python multiprocessing run in Windows

It is fair to demystify a bit the problem, before we move into details - there are no shared dictionaries in the original code, the less they get manipulated ( yes, each of the a,b,c did get "assigned" to a reference to the dict_a, dict_b, dict_c yet none of them is shared, but just get replicated as the multiprocessing does in Windows-class O/S-es. No writes "into" dict-s ( just non-destructive reads-from either of their replicas )

Similarly, the np.memmap()-s are possible to put some part of the originally proposed data onto disk-space ( at a cost of doing so + bearing some ( latency-masked ) random-reads latency of ~ 10 [ms] instead of ~ 0.5 [ns] if smart-aligned vectorised memory-patterns were designed into the performance hot-spot ) yet no dramatic change-of-paradigm ought be expected here, as the "external iterator" almost avoids any smart-aligned cache re-uses

Q : What can be done to my code in order to make it as fast as possible while using as low memory as possible?

The first sin was in using an 8B-int64 to store one plain Bbit ( no Qbits here yet ~ All salutes to Burnaby Quantum R&D Teams )

for i in c_arr:                                    # ~~ np.arange( 2, 42, 10 )
     np.random.seed( i )                           # ~ yields a deterministic state
     c[i] = np.random.poisson( size = common_len ) # ~ 440.000 int64-s with {0|1}

This took 6 (processes) x 440000 x 8B ~ 0.021 GB "smuggled" in all copies of dictionary c, whereas each and every such value is deterministically known and could be generated ALAP inside a respective target process, by just knowing the value of i ( indeed no need to pre-generate and many-times replicate ~ 0.021 GB of data )

So far, the Windows-class O/S lack an os.fork() and thus do a python full-copy ( yes, RAM ..., yes, TIME ) of as many replicated python-interpreter sessions ( plus importing the main module ) as was requested, in multiprocessing for process-based separation ( doing that for avoiding a GIL-lock ordered, pure-[SERIAL], code execution )


The Best Next Step:
re-factor the code
for both efficiency and performance

The best next step - refactor the code, so as to minimise a "shallow" ( and expensive ) use of the 6-processes but "externally"-commanded by a central iterator ( the loop_combo "dictator" with ~ 18522 items to repeat the call to a "remotely-dispatched" func( tup, data ) so as to fetch a simple "DMA-tuple"-( (x,y,z), value ) to store one value into a central process result-float32-array ).

Try to increase the computing "density" - so try to re-factor the code by a divide-and-conquer manner ( i.e., that each of the mp.pool-processes computes in one smooth block some remarkably sized, dedicated sub-space of the parameter-space covered ( here iteratively "from ouside" ) and may easily reduce the returned blocks of results. Performance will only improve by doing this ( best without any form of expensive sharing ).

This re-factoring will avoid parameter pickle/unpickle-costs ( add-on overheads - both the one-time ( in passing the unique parameter-set values ) and the repetitive ( in about a ~ 18522-times executed repetitive memory-allocation, buildup and pickle/unpickle-costs of an np.arange( 440000 ) due to a poor call-signature design / engineering )

All these steps will improve your processing efficiency and reduce the there unnecessary RAM-allocations.

Upvotes: 2

Related Questions