DataWiz
DataWiz

Reputation: 441

Dask memory usage exploding even for simple computations

I have a parquet folder created with dask containing multiple files of about 100MB each. When I load the dataframe with df = dask.dataframe.read_parquet(path_to_parquet_folder), and run any sort of computation (such as df.describe().compute()), my kernel crashes.

Things I have noticed:

EDIT:

I tried to create a reproducible example, without success, but I discovered some other oddities, seemingly all related to the newer pandas dtypes that I'm using:

import pandas as pd
from dask.diagnostics import ProgressBar
ProgressBar().register()
from dask.diagnostics import ResourceProfiler
rprof = ResourceProfiler(dt=0.5)
import dask.dataframe as dd

# generate dataframe with 3 different nullable dtypes and n rows
n = 10000000
test = pd.DataFrame({
    1:pd.Series(['a', pd.NA]*n, dtype = pd.StringDtype()), 
    2:pd.Series([1, pd.NA]*n, dtype = pd.Int64Dtype()),
    3:pd.Series([0.56, pd.NA]*n, dtype = pd.Float64Dtype())
})

dd_df = dd.from_pandas(test, npartitions = 2) # convert to dask df

dd_df.to_parquet('test.parquet') # save as parquet directory

dd_df = dd.read_parquet('test.parquet') # load files back

dd_df.mean().compute() # compute something
dd_df.describe().compute() # compute something
dd_df.count().compute() # compute something
dd_df.max().compute() # compute something

Output, respectively:

KeyError: "None of [Index(['2', '1', '3'], dtype='object')] are in the [columns]"

KeyError: "None of [Index(['2', '1', '3'], dtype='object')] are in the [columns]"

Kernel appears to have died.

KeyError: "None of [Index(['2', '1', '3'], dtype='object')] are in the [columns]"

It seems that the dtypes are preserved even throughout the parquet IO, but dask has some trouble actually doing anything with these columns.

Python version: 3.9.7 dask version: 2021.11.2

Upvotes: 1

Views: 538

Answers (1)

SultanOrazbayev
SultanOrazbayev

Reputation: 16561

It seems the main error is due to NAType which is not yet fully supported by numpy (version 1.21.4):

~/some_env/python3.8/site-packages/numpy/core/_methods.py in _var(a, axis, dtype, out, ddof, keepdims, where)
    240     # numbers and complex types with non-native byteorder
    241     else:
--> 242         x = um.multiply(x, um.conjugate(x), out=x).real
    243 
    244     ret = umr_sum(x, axis, dtype, out, keepdims=keepdims, where=where)

TypeError: loop of ufunc does not support argument 0 of type NAType which has no callable conjugate method

As a workaround, casting columns to float will compute the descriptives. Note that to avoid KeyError the column names are given as strings rather than int.

import pandas as pd
from dask.diagnostics import ProgressBar

ProgressBar().register()
from dask.diagnostics import ResourceProfiler

rprof = ResourceProfiler(dt=0.5)
import dask.dataframe as dd

# generate dataframe with 3 different nullable dtypes and n rows
n = 1000

# note that column names are changed to strings rather than ints
test = pd.DataFrame(
    {
        "1": pd.Series(["a", pd.NA] * n, dtype=pd.StringDtype()),
        "2": pd.Series([1, pd.NA] * n, dtype=pd.Int64Dtype()),
        "3": pd.Series([0.56, pd.NA] * n, dtype=pd.Float64Dtype()),
    }
)

dd_df = dd.from_pandas(test, npartitions=2)  # convert to dask df

dd_df.to_parquet("test.parquet", engine="fastparquet")  # save as parquet directory

dd_df = dd.read_parquet("test.parquet", engine="fastparquet")  # load files back

dd_df.mean().compute()  # compute something
dd_df.astype({"2": "float"}).describe().compute()  # compute something
dd_df.count().compute()  # compute something
dd_df.max().compute()  # compute something

Upvotes: 1

Related Questions