Reputation: 441
I have a parquet
folder created with dask
containing multiple files of about 100MB each. When I load the dataframe with df = dask.dataframe.read_parquet(path_to_parquet_folder)
, and run any sort of computation (such as df.describe().compute()
), my kernel crashes.
Things I have noticed:
I tried to create a reproducible example, without success, but I discovered some other oddities, seemingly all related to the newer pandas
dtypes that I'm using:
import pandas as pd
from dask.diagnostics import ProgressBar
ProgressBar().register()
from dask.diagnostics import ResourceProfiler
rprof = ResourceProfiler(dt=0.5)
import dask.dataframe as dd
# generate dataframe with 3 different nullable dtypes and n rows
n = 10000000
test = pd.DataFrame({
1:pd.Series(['a', pd.NA]*n, dtype = pd.StringDtype()),
2:pd.Series([1, pd.NA]*n, dtype = pd.Int64Dtype()),
3:pd.Series([0.56, pd.NA]*n, dtype = pd.Float64Dtype())
})
dd_df = dd.from_pandas(test, npartitions = 2) # convert to dask df
dd_df.to_parquet('test.parquet') # save as parquet directory
dd_df = dd.read_parquet('test.parquet') # load files back
dd_df.mean().compute() # compute something
dd_df.describe().compute() # compute something
dd_df.count().compute() # compute something
dd_df.max().compute() # compute something
Output, respectively:
KeyError: "None of [Index(['2', '1', '3'], dtype='object')] are in the [columns]"
KeyError: "None of [Index(['2', '1', '3'], dtype='object')] are in the [columns]"
Kernel appears to have died.
KeyError: "None of [Index(['2', '1', '3'], dtype='object')] are in the [columns]"
It seems that the dtypes are preserved even throughout the parquet IO, but dask has some trouble actually doing anything with these columns.
Python version: 3.9.7
dask version: 2021.11.2
Upvotes: 1
Views: 538
Reputation: 16561
It seems the main error is due to NAType
which is not yet fully supported by numpy
(version 1.21.4
):
~/some_env/python3.8/site-packages/numpy/core/_methods.py in _var(a, axis, dtype, out, ddof, keepdims, where)
240 # numbers and complex types with non-native byteorder
241 else:
--> 242 x = um.multiply(x, um.conjugate(x), out=x).real
243
244 ret = umr_sum(x, axis, dtype, out, keepdims=keepdims, where=where)
TypeError: loop of ufunc does not support argument 0 of type NAType which has no callable conjugate method
As a workaround, casting columns to float will compute the descriptives. Note that to avoid KeyError
the column names are given as strings rather than int
.
import pandas as pd
from dask.diagnostics import ProgressBar
ProgressBar().register()
from dask.diagnostics import ResourceProfiler
rprof = ResourceProfiler(dt=0.5)
import dask.dataframe as dd
# generate dataframe with 3 different nullable dtypes and n rows
n = 1000
# note that column names are changed to strings rather than ints
test = pd.DataFrame(
{
"1": pd.Series(["a", pd.NA] * n, dtype=pd.StringDtype()),
"2": pd.Series([1, pd.NA] * n, dtype=pd.Int64Dtype()),
"3": pd.Series([0.56, pd.NA] * n, dtype=pd.Float64Dtype()),
}
)
dd_df = dd.from_pandas(test, npartitions=2) # convert to dask df
dd_df.to_parquet("test.parquet", engine="fastparquet") # save as parquet directory
dd_df = dd.read_parquet("test.parquet", engine="fastparquet") # load files back
dd_df.mean().compute() # compute something
dd_df.astype({"2": "float"}).describe().compute() # compute something
dd_df.count().compute() # compute something
dd_df.max().compute() # compute something
Upvotes: 1