Reputation: 23
I am using Dask on a single machine (LocalCluster
with 4 processes, 16 threads, 68.56GB memory) and am running into worker memory problems when trying to compute two results at once which share a dependency.
In the example shown below, computing result
with just one computation runs fine and quickly, with workers' combined memory usage maxing out at around 1GB. However, when computing results
with two computations the workers quickly use all of their memory and start to write to disk when total memory usage is around 40GB. The computation will eventually finish, but there is a massive slowdown as would be expected once it starts writing to disk.
Intuitively, if one chunk is read in and then its two sums are immediately computed, then the chunk can be discarded and memory usage stays low. However, it appears that Dask is prioritizing the loading of the data instead of the later aggregate computations which clear up memory.
Any help understanding what's going on here would be greatly appreciated. How can I can compute two results with a common dependency without needing to read the underlying data twice or read it fully into memory?
import dask
import dask.dataframe as dd
import dask.array as da
from dask.distributed import Client
client = Client("localhost:8786")
array = da.random.normal(size=(int(1e9), 10), chunks=(int(1e6), 10))
df = dd.from_array(array, columns=[str(i) for i in range(10)])
# does not blow up worker memory, overall usage stays below 1GB total
result = dask.compute(df["0"].sum())
# does blow up worker memory
results = dask.compute([df["0"].sum(), df["1"].sum()])
Upvotes: 2
Views: 1213
Reputation: 16561
The way the array is constructed, every time a chunk is created it has to generate every column of the array. So one opportunity for optimization (if possible) is to generate/load array in a way that allows for column-wise processing. This will reduce memory load of a single task.
Another venue for optimization is to explicitly specify the common dependencies, for example dask.compute(df[['0', '1']].sum())
will run efficiently.
However, the more important point is that by default dask
follows some rules of thumb on how to prioritize work, see here. You have several options to intervene (not sure if this list is exhaustive): custom priorities, resource constraints, modify the compute graph (to allow workers to release memory from intermediate tasks without waiting for the final task to complete).
A simple way to modify the graph is to break down the dependency between the final sum figure and all the intermediate tasks by computing intermediate sums manually:
[results] = dask.compute([df["0"].map_partitions(sum), df["1"].map_partitions(sum)])
Note that results
will be a list of two sublists, but it's trivial to calculate the sum of each sublist (trying to run sum
on a delayed object would trigger computation, so it's more efficient to run sum
after results
are computed).
Upvotes: 2