Tom Hemmes
Tom Hemmes

Reputation: 2060

Filtering grouped df in Dask

Related to this similar question for Pandas: filtering grouped df in pandas

Action To eliminate groups based on an expression applied to a different column than the groupby column.

Problem Filter is not implemented for grouped dataframes.

Tried Groupby and apply to eliminate certain groups, which returns an index error because the apply function is supposed to always return something?

In [16]:
def filter_empty(df):
    if not df.label.values.all(4):
        return df

df_nonempty = df_norm.groupby('hash').apply(filter_empty, meta=meta)

In [17]:
len(df_nonempty.hash.unique())
...

<ipython-input-16-6da6d9b6c069> in filter_empty()
      1 def filter_empty(df):
----> 2     if not df.label.values.all(4):
      3         return df
      4 
      5 df_nonempty = df_norm.groupby('hash').apply(filter_empty, meta=meta)

/opt/conda/lib/python3.5/site-packages/numpy/core/_methods.py in _all()
     39 
     40 def _all(a, axis=None, dtype=None, out=None, keepdims=False):
---> 41     return umr_all(a, axis, dtype, out, keepdims)
     42 
     43 def _count_reduce_items(arr, axis):

ValueError: 'axis' entry is out of bounds

Question Is there another way to achieve the Dask equivalent of Pandas grouped.filter(lambda x: len(x) > 1) ? Or the groupby apply simply implemented wrongly?

Example

import numpy as np
import pandas as pd
import dask.dataframe as dd

In [3]:
df = pd.DataFrame({'A':list('aacaaa'),
                   'B':[4,5,4,5,5,4],
                   'C':[7,8,9,4,2,3],
                   'D':[1,3,5,7,1,0],
                   'E':[5,3,6,9,2,4],
                   'F':list('aaabbc')})
df = dd.from_pandas(df, npartitions=1)

In [8]:
df.A.unique().compute()
Out[8]:
0    a
1    c
Name: A, dtype: object

In [6]:
def filter_4(df):
    if not df.B.values.all(4):
        return df

df_notalla = df.groupby('A').apply(filter_4, meta=df)

In [10]:
df_notall4.A.unique().compute()
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-10-894a491faa57> in <module>()
----> 1 df_notalla.A.unique().compute()

...

<ipython-input-6-ef10326ae42a> in filter_4(df)
      1 def filter_4(df):
----> 2     if not df.B.values.all(4):
      3         return df
      4 
      5 df_notalla = df.groupby('A').apply(filter_4, meta=df)

/opt/conda/lib/python3.5/site-packages/numpy/core/_methods.py in _all(a, axis, dtype, out, keepdims)
     39 
     40 def _all(a, axis=None, dtype=None, out=None, keepdims=False):
---> 41     return umr_all(a, axis, dtype, out, keepdims)
     42 
     43 def _count_reduce_items(arr, axis):

ValueError: 'axis' entry is out of bounds

Upvotes: 3

Views: 2296

Answers (2)

jezrael
jezrael

Reputation: 863741

I think you can groupby + size first, then map for Series (it is like transform, but not implemented in dask too) and last filter by boolean indexing:

df = pd.DataFrame({'A':list('aacaaa'),
                   'B':[4,5,4,5,5,4],
                   'C':[7,8,9,4,2,3],
                   'D':[1,3,5,7,1,0],
                   'E':[5,3,6,9,2,4],
                   'F':list('aaabbc')})

print (df)
   A  B  C  D  E  F
0  a  4  7  1  5  a
1  a  5  8  3  3  a
2  c  4  9  5  6  a
3  a  5  4  7  9  b
4  a  5  2  1  2  b
5  a  4  3  0  4  c

a = df.groupby('F')['A'].size()
print (a)
F
a    3
b    2
c    1
Name: A, dtype: int64

s = df['F'].map(a)
print (s)
0    3
1    3
2    3
3    2
4    2
5    1
Name: F, dtype: int64

df = df[s > 1]
print (df)
   A  B  C  D  E  F
0  a  4  7  1  5  a
1  a  5  8  3  3  a
2  c  4  9  5  6  a
3  a  5  4  7  9  b
4  a  5  2  1  2  b

EDIT:

I think here is not necessary groupby:

df_notall4 = df[df.C != 4].drop_duplicates(subset=['A','D'])['D'].compute()

But if really need it:

def filter_4(x):
        return x[x.C != 4]

df_notall4 = df.groupby('A').apply(filter_4, meta=df).D.unique().compute()
print (df_notall4)
0    1
1    3
2    0
3    5
Name: D, dtype: int64

Upvotes: 3

Tom Hemmes
Tom Hemmes

Reputation: 2060

Thanks to @jezrael I reviewed my implementation and created the following solution (see my provided example).

df_notall4 = []
for d in list(df[df.C != 4].D.unique().compute()):
    df_notall4.append(df.groupby('D').get_group(d))

df_notall4 = dd.concat(df_notall4, interleave_partitions=True)

Which results in

In [8]:
df_notall4.D.unique().compute()
Out[8]:
0    1
1    3
2    5
3    0
Name: D, dtype: object

Upvotes: 1

Related Questions