waithira
waithira

Reputation: 340

GroupBy /Map_partitions in Dask

I have a dask dataframe with 2438 partitions ,each partition is 1.1GB a total of 7B rows I want to do a groupby on multiple columns and aggregate one of the columns

agg = {'total_x':'sum'}
df_s = df_s.map_partitions(lambda dff: dff.groupby(['y', 'z', 'a', 'b','c']).agg(agg) , meta=pd.DataFrame({'y':'int','z':'int', 'a':'int', 'b':'int','c':'object' ,'total_x':'f64'}))

I get the error If using all scalar values, you must pass an index

How do I resolve that ? I am have RAM of 160 GB RAM and 24 workers ,is that computation even possible with that environment ?

If not ,which is anther feasible way ?

Upvotes: 2

Views: 1460

Answers (1)

SultanOrazbayev
SultanOrazbayev

Reputation: 16551

As suggested by @Michael Delgado, there is a problem with meta definition. This should fix the definition of meta:

import pandas as pd

dtypes = {
    "y": "int",
    "z": "int",
    "a": "int",
    "b": "int",
    "c": "object",
    "total_x": "f64",
}
meta = pd.DataFrame(columns=dtypes.keys())

Then, this meta can be passed as a kwarg. See the reproducible example below:

import dask
import pandas as pd

dtypes = {"name": "str", "x": "f64"}
meta = pd.DataFrame(columns=dtypes.keys())


agg = {"x": "sum"}
ddf = dask.datasets.timeseries().map_partitions(
    lambda df: df.groupby(["name"], as_index=False).agg(agg), meta=meta
)

ddf.head()

Upvotes: 1

Related Questions