ks905383
ks905383

Reputation: 180

Storing larger-than-memory xarray dataset to zarr using `dask.delayed` without blowing up memory

I'm trying to use dask to conduct larger-than-memory processing in xarray. Concretely, I'm trying to:

(the resultant zarr store would then be used for further analysis, which is done along the time dimension, and therefore it needs to be unchunked along time).

However, I'm having trouble setting up the workflow without causing memory use to balloon in the ds.to_zarr() call.

I'm trying to follow the Dask best practices (especially this one). A simplified version of the workflow is:

import xarray as xr
import numpy as np
import xesmf as xe
from distributed import Client

# Start dask client
client = Client()
display(client)

@dask.delayed
def load(fn_list):
   ds = xr.open_mfdataset(fn_list)
   return ds

@dask.delayed
def process(ds):
   # Do something to dataset, e.g., regridding
   ref_grid = xr.Dataset(coords = {'lat':np.arange(-89.5,89.6),
                                   'lon':np.arange(-179.5,179.6)})
   rgrd = xe.Regridder(ds,ref_grid,'conservative')
   
   ds = rgrd(ds)
   return ds

def workflow(fn_list):
   ds = load(fn_list)
   
   ds = process(ds)

   # Rechunk
   ds = ds.chunk({'time':-1,'lat':12,'lon':12})

   delayed = dask.delayed(ds.to_zarr)('test.zarr')
   return delayed

out = dask.compute(workflow)
dask.compute(out)

From what I've been gathering through researching this problem, something in the way the task graph is set up causes the whole array to be loaded and sent to one worker when the dask.compute() gets to the .to_zarr() call.

I guess my primary question is - why does the .to_zarr() call need everything in memory / how does one set it up so that it doesn't?

Versions:

zarr == 2.18.3
xarray == 2024.9.0
dask == 2024.9.1

Upvotes: 0

Views: 64

Answers (1)

ks905383
ks905383

Reputation: 180

Excellent discussion on this here.

Upvotes: 0

Related Questions