Fab
Fab

Reputation: 1215

Dask Delayed with xarray - compute() result is still delayed

I tried to perform with Dask and xarray some analysis (e.g. avg) over two datasets, then compute a difference between the two results.

This is my code

cluster = LocalCluster(n_workers=5, threads_per_worker=3, **worker_kwargs)

def calc_avg(path):
    
    mean = xr.open_mfdataset( path,combine='nested', concat_dim="time", parallel=True, decode_times=False, decode_cf=False)['var'].sel(lat=slice(south,north), lon=slice(west,east)).mean(dim='time')
    return mean

def diff_(x,y):
    return x-y

p1 = "/path/to/first/multi-file/dataset"
p2 = "/path/to/second/multi-file/dataset"

a = dask.delayed(calc_avg)(p1)  
b = dask.delayed(calc_avg)(p2)
total = dask.delayed(diff_)(a,b)
result = total.compute()

The executiuon time here is 17s.

However, plotting the result (result.plot()) takes more than 1 min, so it seems that the calculation actually happens when trying to plot the result.

Is this the proper way to use Dask delayed?

Upvotes: 1

Views: 690

Answers (1)

Michael Delgado
Michael Delgado

Reputation: 15432

You’re wrapping a call to xr.open_mfdataset, which is itself a dask operation, in a delayed function. So when you call result.compute, you’re executing the functions calc_avg and mean. However, calc_avg returns a dask-backed DataArray. So yep, the 17s task converts the scheduled delayed dask graph of calc_avg and mean into a scheduled dask.array dask graph of open_mfdataset and array ops.

To resolve this, drop the delayed wrappers and simply use the dask.array xarray workflow:

a = calc_avg(p1)  # this is already a dask array because
                  # calc_avg calls open_mfdataset
b = calc_avg(p2)  # so is this
total = a - b     # dask understands array math, so this "just works"
result = total.compute()    # execute the scheduled job

See the xarray guide to parallel computing with dask for an introduction.

Upvotes: 1

Related Questions