Arthur Radley
Arthur Radley

Reputation: 45

Why does my multiprocessing code stop working on large data sets?

I'm trying to calculate the Moran's Index for a square matrix of features (Information_Gains_Matrix) and a corresponding square weights matrix (Weights_Matrix). For each feature in Information_Gains_Matrix I would like to calculate the Moran's Index while the Weights_Matrix is fixed.

As such I am trying to use multiprocessing pool.map to go along each feature of the Information_Gains_Matrix. I can make code do this in various ways on small test data sets. However, when I use the actual large data set, the code runs, but then the CPU usage drops to 0%, the process hangs, and nothing is released.

I have tried using globals and shared variables in case it is a memory issue, and I have tried using different queue methodologies in case it can be fixed that way, but I have had no success. The below code is one of these examples that works on small data sets but not on large data sets.

import multiprocessing
from multiprocessing import RawArray, Pool, Lock
from functools import partial 
import numpy as np

## Set up initial fake data

Information_Gains_Matrix = np.random.uniform(0,1,(22000,22000))
Weights_Matrix = np.random.uniform(0,1,(22000,22000))

## Function I want to parallelise.  
def Feature_Moran_Index(Chunks,Wij,N):   
    Moran_Index_Scores = np.zeros(Chunks.shape[0])
    for i in np.arange(Chunks.shape[0]):
        print(Chunks[i]) # Print ind to show it's running
        Feature = Information_Gains_Matrix[Chunks[i],:]    
        X_bar = np.mean(Feature)
        if X_bar != 0:
            Deviance = Feature - X_bar
            Outer_Deviance = np.outer(Deviance,Deviance)
            Deviance2 = Deviance * Deviance
            Denom = np.sum(Deviance2)
            Moran_Index_Scores[i] = (N/Wij) * (np.sum((W * np.ndarray.flatten(Outer_Deviance)))/Denom)
    return Moran_Index_Scores

## Set up chunks inds for each core.
Use_Cores = (multiprocessing.cpu_count()-2)
Chunk_Size = np.ceil(Information_Gains_Matrix.shape[0] / Use_Cores)
Range = np.arange(Information_Gains_Matrix.shape[0]).astype("i")
Chunk_Range = np.arange(Chunk_Size).astype("i")
Chunks = []
for i in np.arange(Use_Cores-1):
    Chunks.append(Range[Chunk_Range])
    Range = np.delete(Range,Chunk_Range)

Chunks.append(Range)

if __name__ == '__main__':
    W = RawArray('d', Information_Gains_Matrix.shape[0] * Information_Gains_Matrix.shape[1])
    W_np = np.frombuffer(W, dtype=np.float64).reshape((Information_Gains_Matrix.shape[0], Information_Gains_Matrix.shape[1]))
    np.copyto(W_np, Weights_Matrix)
    N = Information_Gains_Matrix.shape[0]
    Wij = np.sum(Weights_Matrix)  
    with Pool(processes=Use_Cores) as pool:
        Results = pool.map(partial(Feature_Moran_Index, Wij=Wij,N=N), Chunks)

Moran_Index_Score = np.concatenate(Results)

I have no loyalty to this method, if anyone can help me parallelise the calculation of the Moran's Index across the features in any way, I would greatly appreciate it as I just can't seem to get it to work.

Upvotes: 2

Views: 200

Answers (1)

Ben
Ben

Reputation: 560

In Feature_Moran_Index, Deviance has shape (22000,), and Outer_Deviance has shape (22000, 22000) and uses 3.8GB of RAM.

The quantity

np.sum(W * np.ndarray.flatten(Outer_Deviance))

equals

np.sum(W_np * Outer_Deviance)

equals

Deviance @ W_np @ Deviance

After replacing the first expression with the last one, and removing the definition of Outer_Deviance, your program runs to completion with memory usage of c. 11GB.

Upvotes: 1

Related Questions