Cursore
Cursore

Reputation: 33

Xarray Distributed Failed to serialize

I need to upsample through a linear interpolation some satellite images organized in a DataArray. Until I run the code locally I've no issue but, if I try to replicate the interpolation over a distributed system, I get back this error:

`Could not serialize object of type tuple`

to replicate the problem what's needed is to switch between a distributed or local env. here the distributed version of the code.

n_time = 365
px = 2000 
lat = np.linspace(19., 4., px)
lon = np.linspace(34., 53., px)
time = pd.date_range('1/1/2019', periods=n_time, freq='D')
data = xr.DataArray(np.random.random((n_time, px, px)), dims=('time', 'lat', 
'lon'),coords={'time': time, 'lat': lat, 'lon': lon})
data = data.chunk({'time':1})

#upsampling 
nlat = np.linspace(19., 4., px*2)
nlon = np.linspace(34., 53., px*2)
interp = data.interp(lat=nlat, lon=nlon)
computed = interp.compute()

Does any have and idea on how to work around the problem?

EDIT 1:

As seems that I haven't been enough clear in my first MRE so I decided to rewrite with all the inputs received up to now. I need to upsample a satellite dataset from 500 meters to 250m. The final goal is, as chunking along the dimension to be interpolated is not yet supported **, figure out how I can create a workaround and upsampling each image to the 500 datasets.

px = 2000
n_time = 365
time = pd.date_range('1/1/2019', periods=n_time, freq='D')

# dataset to be upsampled
lat_500 = np.linspace(19., 4., px)
lon_500 = np.linspace(34., 53., px)
da_500 = xr.DataArray(dsa.random.random((n_time, px, px),
                      chunks=(1, 1000, 1000)),
                      dims=('time', 'lat', 'lon'),
                      coords={'time': time, 'lat': lat_500, 'lon': lon_500})

# reference dataset
lat_250 = np.linspace(19., 4., px * 2)
lon_250 = np.linspace(34., 53., px * 2)
da_250 = xr.DataArray(dsa.random.random((n_time, px * 2, px * 2),
                      chunks=(1, 1000, 1000)),
                      dims=('time', 'lat', 'lon'),
                      coords={'time': time, 'lat': lat_250, 'lon': lon_250})

# upsampling
da_250i = da_500.interp(lat=lat_250, lon=lon_250)

#fake index
fNDVI = (da_250i-da_250)/(da_250i+da_250)

fNDVI.to_netcdf(r'c:\temp\output.nc').compute()

This should recreate the problem, and avoid to impact on the memory as suggested by Rayan. In any case, the two datasets can be dumped to the disk and then reloaded.

**note seems that something is moving to implement an interpolation along with chunked dataset but isn't still fully available. Here the details https://github.com/pydata/xarray/pull/4155

Upvotes: 1

Views: 306

Answers (2)

Ryan
Ryan

Reputation: 806

In response to your edited question, I have a new solution.

In order to interpolate across the lat / lon dimensions, you need to rechunk the data. I added this line before the interpolation step

da_500 = da_500.chunk({'lat': -1, 'lon': -1})

After doing that, the computation executed without errors for me in distributed mode.

from dask.distributed import Client
client = Client()
fNDVI.to_netcdf(r'~/tmp/test.nc').compute()

I did notice that the computation was rather memory intensive. I recommend monitoring the dask dashboard to see if you are running out of memory.

Upvotes: 1

Ryan
Ryan

Reputation: 806

I believe that there are two things that cause this example to crash, both likely related to memory usage

  • You populate your original dataset with a large numpy array (np.random.random((n_time, px, px)) and then call .chunk after the fact. This forces Dask to pass a large object around in its graphs. Solution: use a lazy loading method.
  • Your object interp requires 47 GB of memory. This is too much for most computers to handle. Solution: add a reduction step before calling compute. This allows you to check whether your interpolation is working properly without simultaneously loading all the results into RAM.

With these modifications, the code looks like this

import numpy as np
import dask.array as dsa
import pandas as pd
import xarray as xr

n_time = 365
px = 2000 
lat = np.linspace(19., 4., px)
lon = np.linspace(34., 53., px)
time = pd.date_range('1/1/2019', periods=n_time, freq='D')

# use dask to lazily create the random data, not numpy
# this avoids populating the dask graph with large objects
data = xr.DataArray(dsa.random.random((n_time, px, px),
                    chunks=(1, px, px)),
                    dims=('time', 'lat', 'lon'),
                    coords={'time': time, 'lat': lat, 'lon': lon})

# upsampling 
nlat = np.linspace(19., 4., px*2)
nlon = np.linspace(34., 53., px*2)
# this object requires 47 GB of memory
# computing it directly is not an option on most computers
interp = data.interp(lat=nlat, lon=nlon)

# instead, we reduce in the time dimension before computing
interp.mean(dim='time').compute()

This ran in a few minutes on my laptop.

Upvotes: 1

Related Questions