Reputation: 1307
This question is possibly related to this one. I have 4D numpy array and would like to apply a function to each 2D slice across the first two dimensions. I have implemented the analysis for both dask and joblib. joblib takes only 0.10 minutes, the dask variant takes 8 minutes. Can somebody explain me why?
Here's an MRE:
import numpy as np
import pandas as pd
import time
import dask.array as da
import statsmodels.formula.api as smf
from joblib import Parallel,delayed
from statsmodels.tools.sm_exceptions import ConvergenceWarning
import warnings
warnings.simplefilter('ignore', ConvergenceWarning)
###############################################################################
## Preparation (only to create data)
###############################################################################
rng = np.random.RandomState(42)
def generate_4d_array(rng,time_steps=10,brain_regions=15,transitions=50, subjects=200,
time_autocorr=0.8, subject_autocorr=0.6, dtype=np.float32):
"""
Generates a 4D array with temporal and subject-based autocorrelation,
and performs Z-score normalization over axis 1 (brain regions).
Parameters:
- rng (np.random.RandomState): RandomState instance for reproducibility (mandatory).
- time_steps (int): Number of time points (default is 100).
- brain_regions (int): Number of brain regions (default is 300).
- transitions (int): Number of transitions per subject (default is 50).
- subjects (int): Number of subjects (default is 100).
- time_autocorr (float): Temporal autocorrelation coefficient (0 < time_autocorr < 1, default is 0.8).
- subject_autocorr (float): Subject autocorrelation coefficient (0 < subject_autocorr < 1, default is 0.6).
- dtype (data type): Data type of the array (default is np.float32).
Returns:
- numpy.ndarray: 4D array with shape (time_steps, brain_regions, transitions, subjects),
Z-score normalized over brain regions (axis 1).
"""
# Generate base signals for subjects
subject_base_signal = rng.normal(size=(subjects, brain_regions)).astype(dtype) * 2
# Precompute random noise for efficiency
noise = rng.normal(scale=0.5, size=(time_steps, brain_regions, transitions, subjects)).astype(dtype)
transition_noise = rng.normal(scale=0.5, size=(transitions, subjects, brain_regions)).astype(dtype)
# Initialize the 4D array
data = np.zeros((time_steps, brain_regions, transitions, subjects), dtype=dtype)
# Populate the 4D array with time series data
for subject in range(subjects):
base_signal = subject_base_signal[subject]
for transition in range(transitions):
transition_signal = base_signal + transition_noise[transition, subject]
time_series = np.zeros((time_steps, brain_regions), dtype=dtype)
time_series[0] = transition_signal + noise[0, :, transition, subject]
# Temporal autocorrelation generation
for t in range(1, time_steps):
time_series[t] = (time_autocorr * time_series[t - 1] +
(1 - time_autocorr) * transition_signal +
noise[t, :, transition, subject])
# Store in the data array
data[:, :, transition, subject] = time_series
# Perform Z-score normalization over axis 1 (brain regions)
mean = np.mean(data, axis=1, keepdims=True) # Mean of brain regions
std = np.std(data, axis=1, keepdims=True) # Standard deviation of brain regions
data = (data - mean) / std # Z-score normalization
return data
data = generate_4d_array(rng)
# how big is whole array and how big is each chunk?
array_gib = data.nbytes / (1024 ** 3)
chunk_gib = data[0, 0, :, :].nbytes / (1024 ** 3)
print(f"Memory occupied by array: {array_gib} GiB, Memory occupied by chunk: {chunk_gib} GiB")
###############################################################################
## With joblib
###############################################################################
def mixed_model(a):
'''Runs one mixed model for one region and one point in time over
all subjects and state transitions'''
# make a dataframe out of input chunk
df = pd.DataFrame(a)
df.index.name = 'transition'
df.columns.name = 'subject'
df = df.melt(ignore_index=False)
# run mixel model
result = smf.mixedlm("value ~ 1", df, groups=df["subject"]).fit(method=["cg"])
t_stat = result.tvalues['Intercept']
return t_stat
def mixed_models(data):
'''Compute mixed-model for every region and every point in time'''
time_points, regions = data.shape[:2]
result = np.zeros((time_points,regions)) # Result matrix (1001x376)
result = Parallel(n_jobs=-1)(delayed(mixed_model)(data[i,j])
for i in range(time_points)
for j in range(regions))
result = np.array(result).reshape((time_points, regions))
return result
start = time.time()
result_matrix_1 = mixed_models(data)
print(f"Took {(time.time() - start) / 60} minutes")
###############################################################################
## With Dask
###############################################################################
def mixed_model(chunk):
'''Runs one mixed model for one region and one point in time over
all subjects and state transitions'''
# make an array out of chunk
X = chunk[0,0,:,:]
# make a dataframe out of input chunk
X = pd.DataFrame(X)
X.index.name = 'transition'
X.columns.name = 'subject'
X = X.melt(ignore_index=False)
# run mixel model
result = smf.mixedlm("value ~ 1", X, groups=X["subject"]).fit(method=["cg"])
t_stat = result.tvalues['Intercept']
# return single value array
t_stat = np.array(t_stat)[None,None]
return t_stat
def mixed_models(data):
# map function to each chunk and compute
result_matrix = data.map_blocks(mixed_model,drop_axis=[2,3]).compute()
return result_matrix
start = time.time()
# convert to dask array (overwrite to not occupy RAM twice)
data = da.from_array(data,chunks=(1,1,data.shape[2],data.shape[3]))
# map function to each chunk and compute
result_matrix_2 = mixed_models(data)
print(f"Took {(time.time() - start) / 60} minutes")
###############################################################################
## Compare outputs
###############################################################################
print(f"Outputs are equal: {np.array_equal(result_matrix_1,result_matrix_2)}")
Upvotes: 0
Views: 43