Reputation: 1215
I tried to perform with Dask and xarray some analysis (e.g. avg) over two datasets, then compute a difference between the two results.
This is my code
cluster = LocalCluster(n_workers=5, threads_per_worker=3, **worker_kwargs)
def calc_avg(path):
mean = xr.open_mfdataset( path,combine='nested', concat_dim="time", parallel=True, decode_times=False, decode_cf=False)['var'].sel(lat=slice(south,north), lon=slice(west,east)).mean(dim='time')
return mean
def diff_(x,y):
return x-y
p1 = "/path/to/first/multi-file/dataset"
p2 = "/path/to/second/multi-file/dataset"
a = dask.delayed(calc_avg)(p1)
b = dask.delayed(calc_avg)(p2)
total = dask.delayed(diff_)(a,b)
result = total.compute()
The executiuon time here is 17s.
However, plotting the result (result.plot()
) takes more than 1 min, so it seems that the calculation actually happens when trying to plot the result.
Is this the proper way to use Dask delayed?
Upvotes: 1
Views: 690
Reputation: 15432
You’re wrapping a call to xr.open_mfdataset
, which is itself a dask operation, in a delayed
function. So when you call result.compute
, you’re executing the functions calc_avg
and mean
. However, calc_avg
returns a dask-backed DataArray. So yep, the 17s task converts the scheduled delayed
dask graph of calc_avg
and mean
into a scheduled dask.array
dask graph of open_mfdataset
and array ops.
To resolve this, drop the delayed wrappers and simply use the dask.array
xarray workflow:
a = calc_avg(p1) # this is already a dask array because
# calc_avg calls open_mfdataset
b = calc_avg(p2) # so is this
total = a - b # dask understands array math, so this "just works"
result = total.compute() # execute the scheduled job
See the xarray guide to parallel computing with dask for an introduction.
Upvotes: 1