Johnmimo
Johnmimo

Reputation: 11

Poor CPU utilization when transforming netcdfs to zarr and rechunking

I am transferring and rechunking data from netcdf to zarr. The process is slow and is not using much of the CPUs. I have tried several different configurations, sometimes it seems to do slightly better, but it hasn't worked well. Does anyone have any tips for making this run more efficiently?

The last attempt (and some, perhaps all, of the previous attempts) (with single machine, distributed scheduler and using threads) the logs gave this message:

distributed.core - INFO - Event loop was unresponsive in Worker for 10.05s. This is often caused by long-running GIL-holding functions or moving large chunks of data.

CPU utilization. It starts low, and then drops even lower.

Previously I have had errors with memory getting used up, so I am writing the zarr in pieces, using the "stepwise_to_zarr" function below:

def stepwise_to_zarr(dataset, step_dim, step_size, chunks, out_loc, group):
    start = dataset[step_dim].min()
    end = dataset[step_dim].max()
    iis = np.arange(start, end, step_size)
    if end > iis[-1]:
          iis = np.append(iis, end)
            
    lon=dataset.get_index(step_dim)
    
    first = True
    failures = []
    for i in range(1,len(iis)):
        lower, upper = (iis[i-1], iis[i])
        if upper >= end:
            lon_list= [l for l in lon if lower <= l <= upper]

        else:
            lon_list= [l for l in lon if lower <= l < upper]
        sub = dataset.sel(longitude=lon_list)

        rechunked_sub = sub.chunk(chunks)
        write_sync=zarr.ThreadSynchronizer()

        if first:
            rechunked_sub.to_zarr(out_loc, group=group, 
                consolidated=True, synchronizer=write_sync, mode="w")
            first = False
        else:
            rechunked_sub.to_zarr(out_loc, group=group, 
                consolidated=True, synchronizer=write_sync, append_dim=step_dim)


chunks = {'time':8760, 'latitude':21,  'longitude':20}
ds = xr.open_mfdataset("path to data", parallel=True, combine="by_coords")
stepwise_to_zarr(ds, step_size=10, step_dim="longitude", 
    chunks=chunks, out_loc="path to output", group="group name")

In the plot above, the drop from ~6% utilization to ~0.5% utilization seems to coincide with the first "batch" of 10 degreees latitude being finished.

Background info:

I have tried:

export OMP_NUM_THREADS=1
export MKL_NUM_THREADS=1
export OPENBLAS_NUM_THREADS=1

as described in best practices(https://docs.dask.org/en/latest/array-best-practices.html?highlight=export#avoid-oversubscribing-threads)

Upvotes: 1

Views: 390

Answers (1)

Johnmimo
Johnmimo

Reputation: 11

I ended up solving my problem by writing to an intermediate Zarr storage with chunks: {'time':8760, 'latitude':260, 'longitude':360}. This went fast, even though cpu the resources were only fully utilized for a relatively small portion of the job. I then read this intermediate zarr and stored in the final chunking, using a modified version of the stepwise process described in the question. This gave acceptable performance, although not ideal.

CPU utilization when writing to intermediate store

CPU utilization when writing from intermediate to final store

Here is the code:

def stepwise_to_zarr(dataset, step_dim, step_size, encoding, out_loc, group, include_end=True):
    start = dataset[step_dim].min()
    end = dataset[step_dim].max()
    iis = np.arange(start, end, step_size)
    if end > iis[-1]:
          iis = np.append(iis, end)

    lon=dataset.get_index(step_dim)
    first = True
    failures = []
    for i in range(1,len(iis)):
        lower, upper = (iis[i-1], iis[i])
        if upper >= end and include_end:
            lon_list= [l for l in lon if lower <= l <= upper]
        else:
            lon_list= [l for l in lon if lower <= l < upper]
    
        sub = dataset.sel(longitude=lon_list)
        write_sync=zarr.ThreadSynchronizer()
        if first:
            sub_write=sub.to_zarr(output_loc,
                           group=varname,
                           consolidated=True,
                           synchronizer=write_sync,
                           encoding=encoding,
                           mode="w", compute=False)
            first = False
        else:
            sub_write=sub.to_zarr(output_loc,
                   group=varname,
                   consolidated=True,
                   synchronizer=write_sync,
                   append_dim=step_dim,
                   compute=False)
        sub_write.compute(retries=2)

z = xr.open_zarr(input_loc, group=groupname)
new_chunks = {'time':8760, 'latitude':21,  'longitude':20}
z_rechunked=z.chunk(new_chunks)

#Workaround to avoid:NotImplementedError: Specified zarr chunks (8760, 260, 360) would #overlap multiple dask chunks
#See https://github.com/pydata/xarray/issues/2300
encoding = {}
for v in ['var1', 'var2', 'var3']:
    encoding.update({v:z[v].encoding.copy()})
    encoding[v]["chunks"]=(96408, 21, 20)

stepwise_to_zarr(z_rechunked, "longitude", 60, encoding, output_loc, group=groupname)

Note I had to overwrite the encodings to be able to rechunk the zarrs.

This process worked, but was a bit cumbersome. I only did it this way because I had not heard of rechunker. The next time I am rechunking I will try rechunker to it takes care of the issue.

Upvotes: 0

Related Questions