Reputation: 47
I am trying to calculate windspeed from u and v components for 1 year data at hourly timestep and 0.1 x 0.1 Degree resolution for a total of 40 years. The individual u and v netcdf files for 1 year is about 5GB each. I have implemented a basic for
loop where the u and v netcdf files for each year are opened through Xarray open_dataset
and rechunked to get them as dask arrays, followed by the calculations and export the result as new netcdf.
When the loop is run, the first iteration happens almost instantaneously but then the loop takes too long for the next iteration (almost to a point where it appears to be stalled). I do not understand what part of my code is bottlenecking here and why. Any help would be appreciated. Also, I have properly implemented the dask scheduler to request the resources adaptively. I am attaching the relevant code snippet for reference :
cluster = PBSCluster(cores=1,memory='8GB',queue='standard',project='civil',interface='ib0',walltime='00:20:00')
cluster.adapt(minimum=1, maximum=8)
client = Client(cluster)
for i in range (1979,2019):
u_dir = glob.glob('../u_wind/uwind_hourly_'+ str(i)+'*.nc')
v_dir = glob.glob('../v_wind/vwind_hourly_'+ str(i)+'*.nc')
w_dir = './wind/wind_hourly_'+str(i)+'-'+str(i)+'.nc'
u_wind = xr.open_dataset(u_dir[0])
v_wind = xr.open_dataset(v_dir[0])
u_wind_rechunk = u_wind.chunk({'time':720})
v_wind_rechunk = v_wind.chunk({'time':720})
u_var = u_wind_rechunk['UGRD_10m']
v_var = v_wind_rechunk['VGRD_10m']
wind_speed = xr.Dataset(data_vars=None, coords=None, attrs=None)
wind_speed=wind_speed.assign(wind_speed=np.sqrt(u_var**2 + v_var**2))
wind_speed.to_netcdf(w_dir)
del u_wind
del v_wind
del u_wind_rechunk
del v_wind_rechunk
del u_var
del v_var
del wind_speed
gc.collect()
Upvotes: 1
Views: 313
Reputation: 16561
As it is, your code still appears to be serial rather than parallel, specifically wind_speed.to_netcdf(w_dir)
will trigger computation right away. The code below might require some adjustment, but the main point is to parallelise your operations:
def single_run(i):
# nothing is modified in the code below relative
u_dir = glob.glob('../u_wind/uwind_hourly_'+ str(i)+'*.nc')
v_dir = glob.glob('../v_wind/vwind_hourly_'+ str(i)+'*.nc')
w_dir = './wind/wind_hourly_'+str(i)+'-'+str(i)+'.nc'
u_wind = xr.open_dataset(u_dir[0])
v_wind = xr.open_dataset(v_dir[0])
u_wind_rechunk = u_wind.chunk({'time':720})
v_wind_rechunk = v_wind.chunk({'time':720})
u_var = u_wind_rechunk['UGRD_10m']
v_var = v_wind_rechunk['VGRD_10m']
wind_speed = xr.Dataset(data_vars=None, coords=None, attrs=None)
wind_speed=wind_speed.assign(wind_speed=np.sqrt(u_var**2 + v_var**2))
wind_speed.to_netcdf(w_dir)
del u_wind
del v_wind
del u_wind_rechunk
del v_wind_rechunk
del u_var
del v_var
del wind_speed
gc.collect()
# new parts
import dask
run_me = dask.compute([dask.delayed(single_run)(i) for i in range (1979,2019)])
Upvotes: 2