LCT
LCT

Reputation: 253

With xarray, how to parallelize 1D operations on a multidimensional Dataset?

I have a 4D xarray Dataset. I want to carry out a linear regression between two variables on a specific dimension (here time), and keep the regression parameters in a 3D array (the remaining dimensions). I managed to get the results I want by using this serial code, but it is rather slow:

# add empty arrays to store results of the regression
res_shape = tuple(v for k,v in ds[x].sizes.items() if k != 'year')
res_dims = tuple(k for k,v in ds[x].sizes.items() if k != 'year')
ds[sl] = (res_dims, np.empty(res_shape, dtype='float32'))
ds[inter] = (res_dims, np.empty(res_shape, dtype='float32'))
# Iterate in kept dimensions
for lat in ds.coords['latitude']:
    for lon in ds.coords['longitude']:
        for duration in ds.coords['duration']:
            locator = {'longitude':lon, 'latitude':lat, 'duration':duration}
            sel = ds.loc[locator]
            res = scipy.stats.linregress(sel[x], sel[y])
            ds[sl].loc[locator] = res.slope
            ds[inter].loc[locator] = res.intercept

How could I speed-up and parallelize this operation?

I understand that apply_ufunc might be an option (and could be parallelized with dask), but I did not managed to get the parameters right.

The following questions are related but without an answer:

Edit 2: move previous edit to an answer

Upvotes: 8

Views: 1942

Answers (2)

Andrew Williams
Andrew Williams

Reputation: 771

The previous answer by LCT covers most of what should be said here, however I think it is possible to incorporate dask='parallelized' with multiple outputs like you get from scipy.stats.linregress.

The trick here is to stack the multiple outputs into one array and then output that, you'll also have to use the output_core_dims kwarg to specify that the DataArray output from the apply_ufunc() call will have an extra dimension now:

def new_linregress(x, y):
    # Wrapper around scipy linregress to use in apply_ufunc
    slope, intercept, r_value, p_value, std_err = stats.linregress(x, y)
    return np.array([slope, intercept, r_value, p_value, std_err])
# return a new DataArray
stats = xr.apply_ufunc(new_linregress, ds[x], ds[y],
                       input_core_dims=[['year'], ['year']],
                       output_core_dims=[["parameter"]],
                       vectorize=True,
                       dask="parallelized",
                       output_dtypes=['float64'],
                       output_sizes={"parameter": 5},
                      )

N.B. This approach currently only works with dask='parallelized' if you have dask<2.0, but it seems to work fine for multiple outputs if you have something else like dask='allowed'. Have a look at this Github issue for more detail.

Hope it helps!

Edit: I've been informed that the dask<2.0 issue has been rectified as long as you have xarray>=0.15.0 ! So can use dask='parallelized' now to speed things up. :)

Upvotes: 6

LCT
LCT

Reputation: 253

It is possible to apply scipy.stats.linregress (and other non-ufuncs) to the xarray Dataset using apply_ufunc() by passing vectorize=True like so:

# return a tuple of DataArrays
res = xr.apply_ufunc(scipy.stats.linregress, ds[x], ds[y],
        input_core_dims=[['year'], ['year']],
        output_core_dims=[[], [], [], [], []],
        vectorize=True)
# add the data to the existing dataset
for arr_name, arr in zip(array_names, res):
    ds[arr_name] = arr

Although still serial, apply_ufunc is around 36x faster than the loop implementation in this specific case.

However the parallelization with dask is still not implemented with multiple output like the one from scipy.stats.linregress:

NotImplementedError: multiple outputs from apply_ufunc not yet supported with dask='parallelized'

Upvotes: 5

Related Questions