spiralarchitect
spiralarchitect

Reputation: 910

Dask groupby over each column separately gives out wrong result

I am simulating here with dummy data what I actually want to do. The steps I need to perform:

  1. Do some transformation over each column separately.
  2. Do groupby operation for aggregate some metrics for each column against a target column.

The code which I have simulated.

import dask.dataframe as dd
from dask.distributed import Client, as_completed, LocalCluster

cluster = LocalCluster(processes=False)

client = Client(cluster, asynchronous=True)

csv_loc = '/Users/apple/Downloads/iris.data'
df = dd.read_csv(csv_loc) # ofcourse, u need to give aws creds here. Omitting it. Assuming u can read from s3 or otherwise.
client.persist(df)
cols = ['sepal_length', 'sepal_width' ,'petal_length' ,'petal_width', 'species']

# This is needed because I am doing some custom operation on actual data
for c in cols:
    if c != 'species':
        df[c] = df[c].map(lambda x: x*10)
client.persist(df) # Is this the trouble?

def agg_bivars(col_name):
    agg_df = df.groupby('species')[col_name].sum().compute()
    return {col_name : agg_df}

agg_futures = client.map(agg_bivars, ['sepal_length', 'sepal_width' ,'petal_length' ,'petal_width'])

for batch in as_completed(agg_futures, with_results=True).batches():
   for future, result in batch:
       print('result: {}'.format(result))


client.restart()
client.close()
cluster.close()

You can download the data from this link. This is a very standard popular data available online.

The result I get: Same result groupby result for different columns.

Expected result: Need different groupby result for different columns.

Result:

result: {'sepal_width': species
Iris-setosa        2503.0
Iris-versicolor    2968.0
Iris-virginica     3294.0
Name: sepal_length, dtype: float64}
result: {'sepal_length': species
Iris-setosa        2503.0
Iris-versicolor    2968.0
Iris-virginica     3294.0
Name: sepal_length, dtype: float64}
result: {'petal_width': species
Iris-setosa        2503.0
Iris-versicolor    2968.0
Iris-virginica     3294.0
Name: sepal_length, dtype: float64}
result: {'petal_length': species
Iris-setosa        2503.0
Iris-versicolor    2968.0
Iris-virginica     3294.0
Name: sepal_length, dtype: float64}

Process finished with exit code 0

If I do only groupby on df, it works fine. But, the issue here is I have to do some transformation on entire df before groupby on each column separately. Note I am doing client.persist(df) twice. I did second time because whatever new transformations I have done, I want them to persist so that I can query quickly.

Upvotes: 2

Views: 329

Answers (2)

rpanai
rpanai

Reputation: 13437

It looks to me that you are overcomplicating things.

Pandas

import pandas as pd
df = pd.read_csv("iris.csv")

df[df.columns[:-1]] = df[df.columns[:-1]] * 10

df.groupby("species").sum()

            sepal_length  sepal_width  petal_length  petal_width
species                                                         
setosa            2503.0       1709.0         732.0        122.0
versicolor        2968.0       1385.0        2130.0        663.0
virginica         3294.0       1487.0        2776.0       1013.0

Dask

import dask.dataframe as dd

df = dd.read_csv("iris.csv")
for col in df.columns[:-1]:
    df[col] = df[col]*10

df.groupby("species").sum().compute()

            sepal_length  sepal_width  petal_length  petal_width
species                                                         
setosa            2503.0       1709.0         732.0        122.0
versicolor        2968.0       1385.0        2130.0        663.0
virginica         3294.0       1487.0        2776.0       1013.0

Then if you want the result as dict you just need to add to_dict() to the output.

Upvotes: 0

skibee
skibee

Reputation: 1332

The problem is with compute() within the agg_bivars function.

Try the following code:

def agg_bivars(col_name):
    agg_df = df.groupby('species')[col_name].sum()  #.compute()
    return {col_name : agg_df}

agg_futures = client.map(agg_bivars, ['sepal_length', 'sepal_width' ,'petal_length' ,'petal_width'])

for batch in as_completed(futures=agg_futures, with_results=True).batches():    
    for future, result in batch:        
        print(f'result: {list(result.values())[0].compute()}')

results with :

result: species
setosa        2503.0
versicolor    2968.0
virginica     3294.0
Name: sepal_length, dtype: float64
result: species
setosa        1709.0
versicolor    1385.0
virginica     1487.0
Name: sepal_width, dtype: float64
result: species
setosa         732.0
versicolor    2130.0
virginica     2776.0
Name: petal_length, dtype: float64
result: species
setosa         122.0
versicolor     663.0
virginica     1013.0
Name: petal_width, dtype: float64

Upvotes: 2

Related Questions