Johannes Wiesner
Johannes Wiesner

Reputation: 1307

Why is joblib's Parallel delayed faster than dasks map block and compute()

This question is possibly related to this one. I have 4D numpy array and would like to apply a function to each 2D slice across the first two dimensions. I have implemented the analysis for both dask and joblib. joblib takes only 0.10 minutes, the dask variant takes 8 minutes. Can somebody explain me why?

Here's an MRE:

import numpy as np
import pandas as pd
import time
import dask.array as da
import statsmodels.formula.api as smf
from joblib import Parallel,delayed
from statsmodels.tools.sm_exceptions import ConvergenceWarning
import warnings
warnings.simplefilter('ignore', ConvergenceWarning)

###############################################################################
## Preparation (only to create data)
###############################################################################

rng = np.random.RandomState(42)

def generate_4d_array(rng,time_steps=10,brain_regions=15,transitions=50, subjects=200, 
                       time_autocorr=0.8, subject_autocorr=0.6, dtype=np.float32):
    """
    Generates a 4D array with temporal and subject-based autocorrelation, 
    and performs Z-score normalization over axis 1 (brain regions).
    
    Parameters:
    - rng (np.random.RandomState): RandomState instance for reproducibility (mandatory).
    - time_steps (int): Number of time points (default is 100).
    - brain_regions (int): Number of brain regions (default is 300).
    - transitions (int): Number of transitions per subject (default is 50).
    - subjects (int): Number of subjects (default is 100).
    - time_autocorr (float): Temporal autocorrelation coefficient (0 < time_autocorr < 1, default is 0.8).
    - subject_autocorr (float): Subject autocorrelation coefficient (0 < subject_autocorr < 1, default is 0.6).
    - dtype (data type): Data type of the array (default is np.float32).
    
    Returns:
    - numpy.ndarray: 4D array with shape (time_steps, brain_regions, transitions, subjects), 
                      Z-score normalized over brain regions (axis 1).
    """
    
    # Generate base signals for subjects
    subject_base_signal = rng.normal(size=(subjects, brain_regions)).astype(dtype) * 2

    # Precompute random noise for efficiency
    noise = rng.normal(scale=0.5, size=(time_steps, brain_regions, transitions, subjects)).astype(dtype)
    transition_noise = rng.normal(scale=0.5, size=(transitions, subjects, brain_regions)).astype(dtype)

    # Initialize the 4D array
    data = np.zeros((time_steps, brain_regions, transitions, subjects), dtype=dtype)

    # Populate the 4D array with time series data
    for subject in range(subjects):
        base_signal = subject_base_signal[subject]
        for transition in range(transitions):
            transition_signal = base_signal + transition_noise[transition, subject]

            time_series = np.zeros((time_steps, brain_regions), dtype=dtype)
            time_series[0] = transition_signal + noise[0, :, transition, subject]
            
            # Temporal autocorrelation generation
            for t in range(1, time_steps):
                time_series[t] = (time_autocorr * time_series[t - 1] + 
                                  (1 - time_autocorr) * transition_signal + 
                                  noise[t, :, transition, subject])

            # Store in the data array
            data[:, :, transition, subject] = time_series

    # Perform Z-score normalization over axis 1 (brain regions)
    mean = np.mean(data, axis=1, keepdims=True)  # Mean of brain regions
    std = np.std(data, axis=1, keepdims=True)    # Standard deviation of brain regions
    data = (data - mean) / std  # Z-score normalization

    return data

data = generate_4d_array(rng)

# how big is whole array and how big is each chunk?
array_gib = data.nbytes / (1024 ** 3)
chunk_gib = data[0, 0, :, :].nbytes / (1024 ** 3)
print(f"Memory occupied by array: {array_gib} GiB, Memory occupied by chunk: {chunk_gib} GiB")

###############################################################################
## With joblib
###############################################################################

def mixed_model(a):
    '''Runs one mixed model for one region and one point in time over
    all subjects and state transitions'''
    
    # make a dataframe out of input chunk
    df = pd.DataFrame(a)
    df.index.name = 'transition'
    df.columns.name = 'subject'
    df = df.melt(ignore_index=False)
    
    # run mixel model
    result = smf.mixedlm("value ~ 1", df, groups=df["subject"]).fit(method=["cg"])
    t_stat = result.tvalues['Intercept']
    
    return t_stat

def mixed_models(data):
    '''Compute mixed-model for every region and every point in time'''
    
    time_points, regions = data.shape[:2]
    result = np.zeros((time_points,regions))  # Result matrix (1001x376)
    
    result = Parallel(n_jobs=-1)(delayed(mixed_model)(data[i,j]) 
                                    for i in range(time_points) 
                                    for j in range(regions))

    result = np.array(result).reshape((time_points, regions))

    return result


start = time.time()
result_matrix_1 = mixed_models(data)
print(f"Took {(time.time() - start) / 60} minutes")

###############################################################################
## With Dask
###############################################################################

def mixed_model(chunk):
    '''Runs one mixed model for one region and one point in time over
    all subjects and state transitions'''
    
    # make an array out of chunk
    X = chunk[0,0,:,:]
    
    # make a dataframe out of input chunk
    X = pd.DataFrame(X)
    X.index.name = 'transition'
    X.columns.name = 'subject'
    X = X.melt(ignore_index=False)
    
    # run mixel model
    result = smf.mixedlm("value ~ 1", X, groups=X["subject"]).fit(method=["cg"])
    t_stat = result.tvalues['Intercept']
    
    # return single value array
    t_stat = np.array(t_stat)[None,None]
    
    return t_stat

def mixed_models(data):
    
    # map function to each chunk and compute
    result_matrix = data.map_blocks(mixed_model,drop_axis=[2,3]).compute()

    return result_matrix

start = time.time()

# convert to dask array (overwrite to not occupy RAM twice)
data = da.from_array(data,chunks=(1,1,data.shape[2],data.shape[3]))

# map function to each chunk and compute
result_matrix_2 = mixed_models(data)

print(f"Took {(time.time() - start) / 60} minutes")

###############################################################################
## Compare outputs
###############################################################################

print(f"Outputs are equal: {np.array_equal(result_matrix_1,result_matrix_2)}")

Upvotes: 0

Views: 43

Answers (0)

Related Questions