Ale
Ale

Reputation: 87

Dask apply with custom function

I am experimenting with Dask, but I encountered a problem while using apply after grouping.

I have a Dask DataFrame with a large number of rows. Let's consider for example the following

N=10000
df = pd.DataFrame({'col_1':np.random.random(N), 'col_2': np.random.random(N) })
ddf = dd.from_pandas(df, npartitions=8)

I want to bin the values of col_1 and I follow the solution from here

bins = np.linspace(0,1,11)
labels = list(range(len(bins)-1))
ddf2 = ddf.map_partitions(test_f, 'col_1',bins,labels)

where

def test_f(df,col,bins,labels):
    return df.assign(bin_num = pd.cut(df[col],bins,labels=labels))

and this works as I expect it to.

Now I want to take the median value in each bin (taken from here)

median = ddf2.groupby('bin_num')['col_1'].apply(pd.Series.median).compute()

Having 10 bins, I expect median to have 10 rows, but it actually has 80. The dataframe has 8 partitions so I guess that somehow the apply is working on each one individually.

However, If I want the mean and use mean

median = ddf2.groupby('bin_num')['col_1'].mean().compute()

it works and the output has 10 rows.

The question is then: what am I doing wrong that is preventing apply from operating as mean?

Upvotes: 2

Views: 2569

Answers (2)

bubblecue
bubblecue

Reputation: 31

You are right! I was able to reproduce your problem on Dask 2.11.0. The good news is that there's a solution! It appears that the Dask groupby problem is specifically with the category type (pandas.core.dtypes.dtypes.CategoricalDtype). By casting the category column to another column type (float, int, str), then the groupby will work correctly.

Here's your code that I copied:

import dask.dataframe as dd
import pandas as pd
import numpy as np


def test_f(df, col, bins, labels):
    return df.assign(bin_num=pd.cut(df[col], bins, labels=labels))

N = 10000
df = pd.DataFrame({'col_1': np.random.random(N), 'col_2': np.random.random(N)})
ddf = dd.from_pandas(df, npartitions=8)

bins = np.linspace(0,1,11)
labels = list(range(len(bins)-1))
ddf2 = ddf.map_partitions(test_f, 'col_1', bins, labels)

print(ddf2.groupby('bin_num')['col_1'].apply(pd.Series.median).compute())

which prints out the problem you mentioned

bin_num
0         NaN
1         NaN
2         NaN
3         NaN
4         NaN
       ...   
5    0.550844
6    0.651036
7    0.751220
8         NaN
9         NaN
Name: col_1, Length: 80, dtype: float64

Here's my solution:

ddf3 = ddf2.copy()
ddf3["bin_num"] = ddf3["bin_num"].astype("int")

print(ddf3.groupby('bin_num')['col_1'].apply(pd.Series.median).compute())

which printed:

bin_num
9    0.951369
2    0.249150
1    0.149563
0    0.049897
3    0.347906
8    0.847819
4    0.449029
5    0.550608
6    0.652778
7    0.749922
Name: col_1, dtype: float64

@MRocklin or @TomAugspurger Would you be able to create a fix for this in a new release? I think there is sufficient reproducible code here. Thanks for all your hard work. I love Dask and use it every day ;)

Upvotes: 2

ava_punksmash
ava_punksmash

Reputation: 397

Maybe this warning is the key (Dask doc: SeriesGroupBy.apply) :

Pandas’ groupby-apply can be used to to apply arbitrary functions, including aggregations that result in one row per group. Dask’s groupby-apply will apply func once to each partition-group pair, so when func is a reduction you’ll end up with one row per partition-group pair. To apply a custom aggregation with Dask, use dask.dataframe.groupby.Aggregation.

Upvotes: 2

Related Questions