astoeriko
astoeriko

Reputation: 890

How to avoid large objects in task graph

I am running simulations using dask.distributed. My model is defined in a delayed function and I stack several realizations. A simplified version of what I do is given in this code snippet:

import numpy as np
import xarray as xr
import dask.array as da
import dask
from dask.distributed import Client
from itertools import repeat 

@dask.delayed
def run_model(n_time,a,b):
    result = np.array([a*np.random.randn(n_time)+b])
    return result

client = Client()

# Parameters
n_sims = 10000
n_time = 100
a_vals = np.random.randn(n_sims)
b_vals = np.random.randn(n_sims)
output_file = 'out.nc'

# Run simulations
out = da.stack([da.from_delayed(run_model(n_time,a,b),(1,n_time,),np.float64) for a,b in zip(a_vals, b_vals)])

# Store output in a dataframe
ds = xr.Dataset({'var1': (['realization', 'time'], out[:,0,:])},
             coords={'realization': np.arange(n_sims),
                     'time': np.arange(n_time)*.1})

# Save to a netcdf file -> at this point, computations will be carried out
ds.to_netcdf(output_file)

If I want to run a lot of simulations I get the following warning:

/home/user/miniconda3/lib/python3.6/site-packages/distributed/worker.py:840: UserWarning: Large object of size 2.73 MB detected in task graph: 
  ("('getitem-32103d4a23823ad4f97dcb3faed7cf07', 0,  ... cd39>]), False)
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  % (format_bytes(len(b)), s))

As far as I understand (from this and this question), the method proposed by the warning helps in getting large data into the function. However, my inputs are all scalar values, so they should not take up nearly 3MB of memory. Even if the function run_model() does not take any argument at all (so no parameters are passed), I get the same warning.

I also had a look at the task graph to see whether there is some step that requires loading lots of data. For three realizations it looks like this: Task graph of the code

So it seems to me that every realization is handled separately which should keep the amount of data to deal with low.

I would like to understand what actually is the step that produces a large object and what I need to do to break it down into smaller parts.

Upvotes: 2

Views: 4011

Answers (1)

mdurant
mdurant

Reputation: 28684

The message is, in this case, slightly misleading. The issue is demonstrated by the following:

> len(out[:, 0, :].dask)
40000
> out[:, 0, :].npartitions
10000

and the pickled size of that graph (whose head is the getitem key in the message) is the ~3MB. By creating a dask-array for each element of the computation, you end up with a stacked array with as many partitions as elements, and the model run operation and item selection, as well as storage operation are being applied to every single one and stored in the graph. Yes, they are independent, and likely this entire computation would complete, but this is all very wasteful, unless the model making function runs for considerable time on each input scalar.

In your real situation, it may be that the inner arrays are in fact bigger than the one-element version you present, but in the general case of doing numpy operations on arrays, it is normal to create the arrays on the workers (with random or some load function) and operate on partitions with sizes >100MB.

Upvotes: 2

Related Questions