Reputation: 910
I am simulating here with dummy data what I actually want to do. The steps I need to perform:
The code which I have simulated.
import dask.dataframe as dd
from dask.distributed import Client, as_completed, LocalCluster
cluster = LocalCluster(processes=False)
client = Client(cluster, asynchronous=True)
csv_loc = '/Users/apple/Downloads/iris.data'
df = dd.read_csv(csv_loc) # ofcourse, u need to give aws creds here. Omitting it. Assuming u can read from s3 or otherwise.
client.persist(df)
cols = ['sepal_length', 'sepal_width' ,'petal_length' ,'petal_width', 'species']
# This is needed because I am doing some custom operation on actual data
for c in cols:
if c != 'species':
df[c] = df[c].map(lambda x: x*10)
client.persist(df) # Is this the trouble?
def agg_bivars(col_name):
agg_df = df.groupby('species')[col_name].sum().compute()
return {col_name : agg_df}
agg_futures = client.map(agg_bivars, ['sepal_length', 'sepal_width' ,'petal_length' ,'petal_width'])
for batch in as_completed(agg_futures, with_results=True).batches():
for future, result in batch:
print('result: {}'.format(result))
client.restart()
client.close()
cluster.close()
You can download the data from this link. This is a very standard popular data available online.
The result I get: Same result groupby result for different columns.
Expected result: Need different groupby result for different columns.
Result:
result: {'sepal_width': species
Iris-setosa 2503.0
Iris-versicolor 2968.0
Iris-virginica 3294.0
Name: sepal_length, dtype: float64}
result: {'sepal_length': species
Iris-setosa 2503.0
Iris-versicolor 2968.0
Iris-virginica 3294.0
Name: sepal_length, dtype: float64}
result: {'petal_width': species
Iris-setosa 2503.0
Iris-versicolor 2968.0
Iris-virginica 3294.0
Name: sepal_length, dtype: float64}
result: {'petal_length': species
Iris-setosa 2503.0
Iris-versicolor 2968.0
Iris-virginica 3294.0
Name: sepal_length, dtype: float64}
Process finished with exit code 0
If I do only groupby on df, it works fine. But, the issue here is I have to do some transformation on entire df before groupby on each column separately. Note I am doing client.persist(df)
twice. I did second time because whatever new transformations I have done, I want them to persist so that I can query quickly.
Upvotes: 2
Views: 329
Reputation: 13437
It looks to me that you are overcomplicating things.
import pandas as pd
df = pd.read_csv("iris.csv")
df[df.columns[:-1]] = df[df.columns[:-1]] * 10
df.groupby("species").sum()
sepal_length sepal_width petal_length petal_width
species
setosa 2503.0 1709.0 732.0 122.0
versicolor 2968.0 1385.0 2130.0 663.0
virginica 3294.0 1487.0 2776.0 1013.0
import dask.dataframe as dd
df = dd.read_csv("iris.csv")
for col in df.columns[:-1]:
df[col] = df[col]*10
df.groupby("species").sum().compute()
sepal_length sepal_width petal_length petal_width
species
setosa 2503.0 1709.0 732.0 122.0
versicolor 2968.0 1385.0 2130.0 663.0
virginica 3294.0 1487.0 2776.0 1013.0
Then if you want the result as dict
you just need to add to_dict()
to the output.
Upvotes: 0
Reputation: 1332
The problem is with compute()
within the agg_bivars
function.
Try the following code:
def agg_bivars(col_name):
agg_df = df.groupby('species')[col_name].sum() #.compute()
return {col_name : agg_df}
agg_futures = client.map(agg_bivars, ['sepal_length', 'sepal_width' ,'petal_length' ,'petal_width'])
for batch in as_completed(futures=agg_futures, with_results=True).batches():
for future, result in batch:
print(f'result: {list(result.values())[0].compute()}')
results with :
result: species
setosa 2503.0
versicolor 2968.0
virginica 3294.0
Name: sepal_length, dtype: float64
result: species
setosa 1709.0
versicolor 1385.0
virginica 1487.0
Name: sepal_width, dtype: float64
result: species
setosa 732.0
versicolor 2130.0
virginica 2776.0
Name: petal_length, dtype: float64
result: species
setosa 122.0
versicolor 663.0
virginica 1013.0
Name: petal_width, dtype: float64
Upvotes: 2