Light_B
Light_B

Reputation: 1800

Dask's parallel for loop slower than single core

What I've tried

I have an embarrassingly parallel for loop in which I iterate over 90x360 values in two nested for loops and do some computation. I tried dask.delayed to parallelize the for loops as per this tutorial although it is demonstrated for a very small set of iterations.

Problem description

I'm surprised to find that the parallel code took 2h 39 mins compared to non-parallel timing of 1h 54 mins which means that I'm doing something fundamentally wrong or maybe the task graphs are too big to handle?

Set-up info

This test was done for a subset of my iterations that is, 10 x 360, but the optimized code should be able to handle 90 x 360 nested iterations. My mini-cluster has 66 cores and 256 GB of RAM and the 2 data files are 4 GB and < 1 GB each. I'm also confused between the approach of multi-processing vs multi-threading for this task. I thought running parallel loops in multiple processes similar to joblib default implementation would be the way to go as each loop works on independent grid-points. But, this suggests that multi-threading is faster and should be preferred if one doesn't have a GIL issue (which I don't). So, for the timing above, I used dask.delay default scheduling option which uses multi-threading option for a single process.

Simplified code

import numpy as np
import pandas as pd
import xarray as xr
from datetime import datetime
from dask import compute, delayed

def add_data_from_small_file(lat):
    """ for each grid-point, get time steps from big-file as per mask, and
        compute data from small file for those time-steps
        Returns: array per latitude which is to be stacked
    """

    for lon in range(0,360):
        # get time steps from big file
        start_time = big_file.time.values[mask1[:, la, lo]] 
        end_time = big_file.time.values[[mask2[:,la,lo]]

        i=0  
        for t1, t2 in zip(start_time, end_time):
              # calculate value from small file for each time pair
              temp_var[i] = small_file.sel(t=slice(t1, t2)).median()
              i=i+1

         temp_per_lon[:, lon] = temp_var
     return temp_per_lon



if __name__ == '__main__':
    t1 = datetime.now()
    small_file = xr.open_dataarray('small_file.nc') # size < 1 GB, 10000x91
    big_file = xr.open_dataset('big_file.nc') # size = 4 GB, 10000x91x360

    delayed_values = [delayed(add_data_from_small_file)(lat) for lat in range(0,10)] # 10 loops for testing, to scale to 90 loops
    # have to delay stacking to avoid memory error
    stack_arr = delayed(np.stack)(delayed_values, axis=1) 
    stack_arr = stack_arr.compute()
    print('Total run time:{}'.format(datetime.now()-t1))

Upvotes: 3

Views: 836

Answers (1)

MRocklin
MRocklin

Reputation: 57319

Every delayed task adds about 1ms of overhead. So if your function is slow (maybe you're calling out to some other expensive function), then yes dask.delayed might be a good fit. If not, then you should probably look elsewhere.

If you're curious about whether or not threads or processes are better for you, the easiest way to find out is just to try both. It is easy to do.

dask.compute(*values, scheduler="processes")
dask.compute(*values, scheduler="threads")

It could be that even though you're using numpy arrays, most of your time is actually spent in Python for loops. If so, multithreading won't help you here, and the real solution is to stop using Python for loops, either by being clever with numpy/xarray, or by using a project like Numba.

Upvotes: 3

Related Questions