Reputation: 2054
Lets consider a large dataset recorded as a set of netcdf files like the one created by the following code:
import xarray as xr
import numpy as np
import dask
from scipy.stats import median_absolute_deviation
import os
n_epochs = 200
n_times = 1500
n_series = 10000
def some_process_generating_sequential_data_epochs():
return np.random.rand(n_series, n_times)
os.mkdir("temp")
# writes 200 files of 120.1 MB each
for no_epoch in range(n_epochs):
array = xr.DataArray(some_process_generating_sequential_data_epochs()[:, :, None],
dims=("series", "times", "epochs"),
coords={"series": range(n_series),
"times": range(n_times),
"epochs": [no_epoch]})
array.chunk({"series": 1000, "times": 500, "epochs":1})\
.to_netcdf("temp/dummy_file_{}.nc".format(no_epoch))
Now, if we want to apply an aggregating functions across the epochs dimension (i.e., the dimension that split the dataset in files), we can use
dataset = xr.open_mfdataset("temp/dummy_file_*.nc", chunks={"series": 1000, "times": 500, "epochs":1},
concat_dim='epochs', combine='by_coords')
dataset.reduce(np.std, "epochs", allow_lazy=True)\
.compute().to_netcdf("temp/std.nc")
The explicit mention of the chunking is important, else dask will load the whole files which won't fit in memory [1]
By default, chunks will be chosen to load entire input files into memory at once.
This works fine apparently with numpy mean, std, and median functions. However, it does not work with other functions (e.g., scipy.stats.median_absolute_deviation) which will try to load the whole dataset and triggers a MemoryError
. I suspect that it has to do with a 5-years-old issue on dask github. In this ticket, they report
The problem arises when you have a large array interact with the mean, like the following:
(x - x.mean()).sum().compute()
The current workaround is to explicitly compute the mean beforehand
(x - x.mean().compute()).sum().compute()
My attempt to apply such a strategy by modifying the scipy.stats version of median_absolute_deviation were unsuccessful. They also propose using dask.set_options(split_every=2)
(which is now deprecated in favour of dask.config.set(split_every=2)
) but it does not seems to help here.
Is there a proper, head-scratch-free and more-or-less always valid idiom to perform these reduce operations to this kind of dataset?
Upvotes: 0
Views: 844
Reputation: 57281
Your question was quite long, so I didn't read it all (my apologies, I try to answer these questions, but there are many of them).
I think that you're asking ...
How do I apply an arbitrary reduction function onto a large dataset?
The answer is that you can't. Operations like da.mean
are rewritten with parallel algorithms. You can't just take the Numpy or Scipy version and apply it as-is. However, if you can do a bit of thinking about how to break down your reduction operation to operate in parallel then you might want to try out the da.reduction
function.
Alternatively, if you're applying a reduction along only one axis, then you could rechunk your data so that that axis is single-chunked, and then use something like map_blocks
to apply your function in an embarrassingly parallel way.
Upvotes: 1