Reputation: 412
I'm trying to use scipy.optimize.curve_fit on a large latitude/longitude/time xarray using dask.distributed as computing backend.
The idea is to run an individual data fitting for every (latitude, longitude) using the time series.
All of this runs fine outside xarray/dask. I tested it using the time series of a single location passed as a pandas dataframe. However, if I try to run the same process on the same (latitude, longitude) directly on the xarray, the curve_fit operation returns the initial parameters.
I am performing this operation using xr.apply_ufunc
like so (here I'm providing only the code that is strictly relevant to the problem):
# function to perform the fit
def _fit_rti_curve(data, data_rti, fit, loc=False):
fit_func, linearize, find_init_params = _get_fit_functions(fit)
# remove nans
x, y = _filter_nodata(data_rti, data)
# remove outliers
x, y = _filter_for_outliers(x, y, linearize=linearize)
# find a first guess for maximum achieveable value
yscale = np.max(y) * 1.05
# find a first guess for the other parameters
# here loc can be manually passed if you have a good estimation
init_parms = find_init_params(x, y, yscale, loc=loc, linearize=linearize)
# fit the curve and return parameters
parms = curve_fit(fit_func, x, y, p0=init_parms, maxfev=10000)
parms = parms[0]
return parms
# shell around _fit_rti_curve
def find_rti_func_parms(data, rti, fit):
# sort and fit highest n values
top_data = np.sort(data)
top_data = top_data[-len(rti):]
# convert to float64 if needed
top_data = top_data.astype(np.float64)
rti = rti.astype(np.float64)
# run the fit
parms = _fit_rti_curve(top_data, rti, fit, loc=0) #TODO maybe add function to allow a free loc
return parms
# call for the apply_ufunc
# `fit` is a string that defines the distribution type
# `rti` is an array for the x values
parms_data = xr.apply_ufunc(
find_rti_func_parms,
xr_obj,
input_core_dims=[['time']],
output_core_dims=[[fit + ' parameters']],
output_sizes = {fit + ' parameters': len(signature(fit_func).parameters) - 1},
vectorize=True,
kwargs={'rti':return_time_interval, 'fit':fit},
dask='parallelized',
output_dtypes=['float64']
)
My guess would be that is a problem related to threading, or at least some shared memory space that is not properly passed between workers and scheduler. However, I am just not knowledgeable enough to test this within dask.
Any idea on this problem?
Upvotes: 1
Views: 1124
Reputation: 346
You should have a look at this issue https://github.com/pydata/xarray/issues/4300 I had the same problem and I solved using apply_ufunc. It is not optimized, since it has to perform rechunking operations, but it works! I've created a GitHub Gist for it https://gist.github.com/clausmichele/8350e1f7f15e6828f29579914276de71
Upvotes: 2
Reputation: 781
This previous answer might be helpful? It's using numpy.polyfit
but I think the general approach should be similar.
Applying numpy.polyfit to xarray Dataset
Also, I haven't tried it but xr.polyfit()
just got merged recently! Could also be something to look into. http://xarray.pydata.org/en/stable/generated/xarray.DataArray.polyfit.html#xarray.DataArray.polyfit
Upvotes: 1