Reputation: 180
I'm trying to use dask to conduct larger-than-memory processing in xarray. Concretely, I'm trying to:
zarr
store, chunked such that there is only ever one chunk on time.(the resultant zarr store would then be used for further analysis, which is done along the time dimension, and therefore it needs to be unchunked along time).
However, I'm having trouble setting up the workflow without causing memory use to balloon in the ds.to_zarr()
call.
I'm trying to follow the Dask best practices (especially this one). A simplified version of the workflow is:
import xarray as xr
import numpy as np
import xesmf as xe
from distributed import Client
# Start dask client
client = Client()
display(client)
@dask.delayed
def load(fn_list):
ds = xr.open_mfdataset(fn_list)
return ds
@dask.delayed
def process(ds):
# Do something to dataset, e.g., regridding
ref_grid = xr.Dataset(coords = {'lat':np.arange(-89.5,89.6),
'lon':np.arange(-179.5,179.6)})
rgrd = xe.Regridder(ds,ref_grid,'conservative')
ds = rgrd(ds)
return ds
def workflow(fn_list):
ds = load(fn_list)
ds = process(ds)
# Rechunk
ds = ds.chunk({'time':-1,'lat':12,'lon':12})
delayed = dask.delayed(ds.to_zarr)('test.zarr')
return delayed
out = dask.compute(workflow)
dask.compute(out)
From what I've been gathering through researching this problem, something in the way the task graph is set up causes the whole array to be loaded and sent to one worker when the dask.compute()
gets to the .to_zarr()
call.
I guess my primary question is - why does the .to_zarr()
call need everything in memory / how does one set it up so that it doesn't?
Versions:
zarr == 2.18.3
xarray == 2024.9.0
dask == 2024.9.1
Upvotes: 0
Views: 64