Reputation: 1800
What I've tried
I have an embarrassingly parallel for loop in which I iterate over 90x360 values in two nested for loops and do some computation. I tried dask.delayed
to parallelize the for loops as per this tutorial although it is demonstrated for a very small set of iterations.
Problem description
I'm surprised to find that the parallel code took 2h 39 mins compared to non-parallel timing of 1h 54 mins which means that I'm doing something fundamentally wrong or maybe the task graphs are too big to handle?
Set-up info
This test was done for a subset of my iterations that is, 10 x 360, but the optimized code should be able to handle 90 x 360 nested iterations. My mini-cluster has 66 cores and 256 GB of RAM and the 2 data files are 4 GB and < 1 GB each. I'm also confused between the approach of multi-processing
vs multi-threading
for this task. I thought running parallel loops in multiple processes similar to joblib
default implementation would be the way to go as each loop works on independent grid-points. But, this suggests that multi-threading
is faster and should be preferred if one doesn't have a GIL issue (which I don't). So, for the timing above, I used dask.delay
default scheduling option which uses multi-threading option for a single process.
Simplified code
import numpy as np
import pandas as pd
import xarray as xr
from datetime import datetime
from dask import compute, delayed
def add_data_from_small_file(lat):
""" for each grid-point, get time steps from big-file as per mask, and
compute data from small file for those time-steps
Returns: array per latitude which is to be stacked
"""
for lon in range(0,360):
# get time steps from big file
start_time = big_file.time.values[mask1[:, la, lo]]
end_time = big_file.time.values[[mask2[:,la,lo]]
i=0
for t1, t2 in zip(start_time, end_time):
# calculate value from small file for each time pair
temp_var[i] = small_file.sel(t=slice(t1, t2)).median()
i=i+1
temp_per_lon[:, lon] = temp_var
return temp_per_lon
if __name__ == '__main__':
t1 = datetime.now()
small_file = xr.open_dataarray('small_file.nc') # size < 1 GB, 10000x91
big_file = xr.open_dataset('big_file.nc') # size = 4 GB, 10000x91x360
delayed_values = [delayed(add_data_from_small_file)(lat) for lat in range(0,10)] # 10 loops for testing, to scale to 90 loops
# have to delay stacking to avoid memory error
stack_arr = delayed(np.stack)(delayed_values, axis=1)
stack_arr = stack_arr.compute()
print('Total run time:{}'.format(datetime.now()-t1))
Upvotes: 3
Views: 836
Reputation: 57319
Every delayed task adds about 1ms of overhead. So if your function is slow (maybe you're calling out to some other expensive function), then yes dask.delayed might be a good fit. If not, then you should probably look elsewhere.
If you're curious about whether or not threads or processes are better for you, the easiest way to find out is just to try both. It is easy to do.
dask.compute(*values, scheduler="processes")
dask.compute(*values, scheduler="threads")
It could be that even though you're using numpy arrays, most of your time is actually spent in Python for loops. If so, multithreading won't help you here, and the real solution is to stop using Python for loops, either by being clever with numpy/xarray, or by using a project like Numba.
Upvotes: 3