Reputation: 87
I am experimenting with Dask, but I encountered a problem while using apply
after grouping.
I have a Dask DataFrame with a large number of rows. Let's consider for example the following
N=10000
df = pd.DataFrame({'col_1':np.random.random(N), 'col_2': np.random.random(N) })
ddf = dd.from_pandas(df, npartitions=8)
I want to bin the values of col_1
and I follow the solution from here
bins = np.linspace(0,1,11)
labels = list(range(len(bins)-1))
ddf2 = ddf.map_partitions(test_f, 'col_1',bins,labels)
where
def test_f(df,col,bins,labels):
return df.assign(bin_num = pd.cut(df[col],bins,labels=labels))
and this works as I expect it to.
Now I want to take the median value in each bin (taken from here)
median = ddf2.groupby('bin_num')['col_1'].apply(pd.Series.median).compute()
Having 10 bins, I expect median
to have 10 rows, but it actually has 80. The dataframe has 8 partitions so I guess that somehow the apply is working on each one individually.
However, If I want the mean and use mean
median = ddf2.groupby('bin_num')['col_1'].mean().compute()
it works and the output has 10 rows.
The question is then: what am I doing wrong that is preventing apply
from operating as mean
?
Upvotes: 2
Views: 2569
Reputation: 31
You are right! I was able to reproduce your problem on Dask 2.11.0. The good news is that there's a solution! It appears that the Dask groupby problem is specifically with the category type (pandas.core.dtypes.dtypes.CategoricalDtype). By casting the category column to another column type (float, int, str), then the groupby will work correctly.
Here's your code that I copied:
import dask.dataframe as dd
import pandas as pd
import numpy as np
def test_f(df, col, bins, labels):
return df.assign(bin_num=pd.cut(df[col], bins, labels=labels))
N = 10000
df = pd.DataFrame({'col_1': np.random.random(N), 'col_2': np.random.random(N)})
ddf = dd.from_pandas(df, npartitions=8)
bins = np.linspace(0,1,11)
labels = list(range(len(bins)-1))
ddf2 = ddf.map_partitions(test_f, 'col_1', bins, labels)
print(ddf2.groupby('bin_num')['col_1'].apply(pd.Series.median).compute())
which prints out the problem you mentioned
bin_num
0 NaN
1 NaN
2 NaN
3 NaN
4 NaN
...
5 0.550844
6 0.651036
7 0.751220
8 NaN
9 NaN
Name: col_1, Length: 80, dtype: float64
Here's my solution:
ddf3 = ddf2.copy()
ddf3["bin_num"] = ddf3["bin_num"].astype("int")
print(ddf3.groupby('bin_num')['col_1'].apply(pd.Series.median).compute())
which printed:
bin_num
9 0.951369
2 0.249150
1 0.149563
0 0.049897
3 0.347906
8 0.847819
4 0.449029
5 0.550608
6 0.652778
7 0.749922
Name: col_1, dtype: float64
@MRocklin or @TomAugspurger Would you be able to create a fix for this in a new release? I think there is sufficient reproducible code here. Thanks for all your hard work. I love Dask and use it every day ;)
Upvotes: 2
Reputation: 397
Maybe this warning is the key (Dask doc: SeriesGroupBy.apply) :
Pandas’ groupby-apply can be used to to apply arbitrary functions, including aggregations that result in one row per group. Dask’s groupby-apply will apply func once to each partition-group pair, so when func is a reduction you’ll end up with one row per partition-group pair. To apply a custom aggregation with Dask, use dask.dataframe.groupby.Aggregation.
Upvotes: 2