Severin
Severin

Reputation: 311

Unaccountable Dask memory usage

I am digging into Dask and (mostly) feel comfortable with it. However I cannot understand what is going on in the following scenario. TBH, I'm sure a question like this has been asked in the past, but after searching for awhile I can't seem to find one that really hits the nail on the head. So here we are!

In the code below, you can see a simple python function with a Dask-delayed decorator on it. In my real use-case scenario this would be a "black box" type function within which I don't care what happens, so long as it stays with a 4 GB memory budget and ultimately returns a pandas dataframe. In this case I've specifically chosen the value N=1.5e8 since this results in a total memory footprint of nearly 2.2 GB (large, but still well within the budget). Finally, when executing this file as a script, I have a "data pipeline" which simply runs the black-box function for some number of ID's, and in the end builds up a result dataframe (which I could then do more stuff with)

The confusing bit comes in when this is executed. I can see that only two function calls are executed at once (which is what I would expect), but I receive the warning message distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 3.16 GiB -- Worker memory limit: 3.73 GiB, and shortly thereafter the script exits prematurely. Where is this memory usage coming from?? Note that if I increase memory_limit="8GB" (which is actually more than my computer has), then the script runs fine and my print statement informs me that the dataframe is indeed only utilizing 2.2 GB of memory

Please help me understand this behavior and, hopefully, implement a more memory-safe approach

Many thanks!

BTW:


import time

import pandas as pd
import numpy as np
from dask.distributed import LocalCluster, Client
import dask


@dask.delayed
def do_pandas_thing(id):
    print(f"STARTING: {id}")
    N = 1.5e8
    df = pd.DataFrame({"a": np.arange(N), "b": np.arange(N)})

    print(
        f"df memory usage {df.memory_usage().sum()/(2**30):.3f} GB",
    )

    # Simulate a "long" computation
    time.sleep(5)

    return df.iloc[[-1]]  # return the last row


if __name__ == "__main__":
    cluster = LocalCluster(
        n_workers=2,
        memory_limit="4GB",
        threads_per_worker=1,
        processes=True,
    )
    client = Client(cluster)

    # Evaluate "black box" functions with pandas inside
    results = []
    for i in range(10):
        results.append(do_pandas_thing(i))

    # compute
    r = dask.compute(results)[0]

    print(pd.concat(r, ignore_index=True))

Upvotes: 3

Views: 650

Answers (1)

SultanOrazbayev
SultanOrazbayev

Reputation: 16561

I am unable to reproduce the warning/error with the following versions:

  • pandas=1.2.4
  • dask=2021.4.1
  • python=3.8.8

When the object size increases, the process does crash due to memory, but it's a good idea to have workloads that are a fraction of the available memory:

To put it simply, we weren't thinking about analyzing 100 GB or 1 TB datasets in 2011. Nowadays, my rule of thumb for pandas is that you should have 5 to 10 times as much RAM as the size of your dataset. So if you have a 10 GB dataset, you should really have about 64, preferably 128 GB of RAM if you want to avoid memory management problems. This comes as a shock to users who expect to be able to analyze datasets that are within a factor of 2 or 3 the size of their computer's RAM.

source

Upvotes: 1

Related Questions