Reputation: 2354
I have a 37 GB .npy file that I would like to convert to Zarr store so that I can include coordinate labels. I have code that does this in theory, but I keep running out of memory. I want to use Dask in-between to facilitate doing this in chunks, but I still keep running out of memory.
The data is "thickness maps" for people's femoral cartilage. Each map is a 310x310 float array, and there are 47789 of these maps. So the data shape is (47789, 310, 310).
Step 1: Load the npy file as a memmapped Dask array.
fem_dask = dask.array.from_array(np.load('/Volumes/T7/cartilagenpy20220602/femoral.npy', mmap_mode='r'),
chunks=(300, -1, -1))
Step 2: Make an xarray DataArray over the Dask array, with the desired coordinates. I have several coordinates for the 'map' dimension that come from metadata
(a pandas dataframe).
fem_xr = xr.DataArray(fem_dask, dims=['map','x','y'],
coords={'patient_id': ('map', metadata['patient_id']),
'side': ('map', metadata['side'].astype(np.string_)),
'timepoint': ('map', metadata['timepoint'])
})
Step 3: Write to Zarr.
fem_ds = fem_xr.to_dataset(name='femoral') # Zarr requires Dataset, not DataArray
res = fem_ds.to_zarr('/Volumes/T7/femoral.zarr',
encoding={'femoral': {'dtype': 'float32'}},
compute=False)
res.visualize()
See task graph below if desired
When I call res.compute()
, RAM use quickly climbs out of control. The other python processes, which I think are the Dask workers, seem to be inactive:
But a bit later, they are active -- see that one of those Python processes now has 20 gb RAM and another has 36 gb:
Which we can also confirm from the Dask dashboard:
Eventually all the workers get killed and the task errors out. How can I do this in an efficient way that correctly uses Dask, xarray, and Zarr, without running out of RAM (or melting the laptop)?
Upvotes: 4
Views: 1034
Reputation: 15432
If the dask workers can share threads, your code should just work. If you don't initialize a dask Cluster explicitly, dask.Array will create one with default args, which use processes. This results in the behavior you're seeing. To solve this, explicitly create a cluster using threads:
# use threads, not processes
cluster = dask.distributed.LocalCluster(processes=False)
client = dask.distributed.Client(cluster)
arr = np.load('myarr.npy', mmap_mode='r')
da = dda.from_array(arr).rechunk(chunks=(100, 310, 310))
da.to_zarr('myarr.zarr', mode='w')
If you're using a cluster which cannot share threads, such as a JobQueue, KubernetesCluster, etc., you can use the following to read the npy file, assuming it's on a networked filesystem or is in some way available to all workers.
Here's a workflow that creates an empy array from the memory map, then maps the read job using dask.array.map_blocks
. The key is the use of the block_info
optional keyword, which gives information about the location of the block within the array, which we can use to slice new mmap array objects using dask workers:
def load_npy_chunk(da, fp, block_info=None, mmap_mode='r'):
"""Load a slice of the .npy array, making use of the block_info kwarg"""
np_mmap = np.load(fp, mmap_mode=mmap_mode)
array_location = block_info[0]['array-location']
dim_slicer = tuple(list(map(lambda x: slice(*x), array_location)))
return np_mmap[dim_slicer]
def dask_read_npy(fp, chunks=None, mmap_mode='r'):
"""Read metadata by opening the mmap, then send the read job to workers"""
np_mmap = np.load(fp, mmap_mode=mmap_mode)
da = dda.empty_like(np_mmap, chunks=chunks)
return da.map_blocks(load_npy_chunk, fp=fp, mmap_mode=mmap_mode, meta=da)
This works for me on a demo of the same size (you could add the xarray.DataArray creation/formatting step at the end, but the dask ops work fine and worker memory stays below 1GB for me):
import numpy as np, dask.array as dda, xarray as xr, pandas as pd, dask.distributed
### insert/import above functions here
# save a large numpy array
np.save('myarr.npy', np.empty(shape=(47789, 310, 310), dtype=np.float32))
cluster = dask.distributed.LocalCluster()
client = dask.distributed.Client(cluster)
da = dask_read_npy('myarr.npy', chunks=(300, -1, -1), mmap_mode='r')
da.to_zarr('myarr.zarr', mode='w')
Upvotes: 4