user73445
user73445

Reputation: 23

Dask high memory usage when computing two values with common dependency

I am using Dask on a single machine (LocalCluster with 4 processes, 16 threads, 68.56GB memory) and am running into worker memory problems when trying to compute two results at once which share a dependency.

In the example shown below, computing result with just one computation runs fine and quickly, with workers' combined memory usage maxing out at around 1GB. However, when computing results with two computations the workers quickly use all of their memory and start to write to disk when total memory usage is around 40GB. The computation will eventually finish, but there is a massive slowdown as would be expected once it starts writing to disk.

Intuitively, if one chunk is read in and then its two sums are immediately computed, then the chunk can be discarded and memory usage stays low. However, it appears that Dask is prioritizing the loading of the data instead of the later aggregate computations which clear up memory.

Any help understanding what's going on here would be greatly appreciated. How can I can compute two results with a common dependency without needing to read the underlying data twice or read it fully into memory?

import dask
import dask.dataframe as dd
import dask.array as da
from dask.distributed import Client

client = Client("localhost:8786")

array = da.random.normal(size=(int(1e9), 10), chunks=(int(1e6), 10))
df = dd.from_array(array, columns=[str(i) for i in range(10)])

# does not blow up worker memory, overall usage stays below 1GB total
result = dask.compute(df["0"].sum())

# does blow up worker memory
results = dask.compute([df["0"].sum(), df["1"].sum()])

Upvotes: 2

Views: 1213

Answers (1)

SultanOrazbayev
SultanOrazbayev

Reputation: 16561

The way the array is constructed, every time a chunk is created it has to generate every column of the array. So one opportunity for optimization (if possible) is to generate/load array in a way that allows for column-wise processing. This will reduce memory load of a single task.

Another venue for optimization is to explicitly specify the common dependencies, for example dask.compute(df[['0', '1']].sum()) will run efficiently.

However, the more important point is that by default dask follows some rules of thumb on how to prioritize work, see here. You have several options to intervene (not sure if this list is exhaustive): custom priorities, resource constraints, modify the compute graph (to allow workers to release memory from intermediate tasks without waiting for the final task to complete).

A simple way to modify the graph is to break down the dependency between the final sum figure and all the intermediate tasks by computing intermediate sums manually:

[results] = dask.compute([df["0"].map_partitions(sum), df["1"].map_partitions(sum)])

Note that results will be a list of two sublists, but it's trivial to calculate the sum of each sublist (trying to run sum on a delayed object would trigger computation, so it's more efficient to run sum after results are computed).

Upvotes: 2

Related Questions