Gijs
Gijs

Reputation: 33

How to remedy excessive hard disk usage (>>100GB) by Dask Dataframe when shuffling data

I need to calculate statistics per segment of large (15 - 20 GB) CSV files. This I do with groupby() in Dask Dataframe.

The problem is that I need custom functions, because I need kurtosis and skew, which are not part of Dask. Therefore I use groupby().apply(). However, this makes Dask use tremendous amounts of disk drive space in my Temp directory: more than 150 GB just running the script once! This causes my hard drive to run out of space, making the script crash.

Is there a way to rewrite the code which makes it avoid writing such an enormous amount of junk to my Temp directory?

Example code is given below:

Any help would be appreciated!

By the way: this page (https://docs.dask.org/en/latest/dataframe-groupby.html), suggests using an indexed column for the groupby(). But unfortunately multi-indexing is not supported by Dask Dataframe, so that does not solve my problem.

import dask.dataframe as dd
import numpy as np
import scipy.stats as sps

ddf = dd.read_csv('18_GB_csv_file.csv')

segmentations = { 'seg1' : ['col1', 'col2'],
                 'seg2' : ['col1', 'col2', 'col3', 'col4'],
                 'seg3' : ['col3', 'col4'],
                 'seg4' : ['col1', 'col2', 'col5']
               }
data_cols = [ 'datacol1', 'datacol2', 'datacol3' ]


# Example 1: this runs fast and doesn't generate needless temp output.
# But it does not support "kurt" or "skew":

dd_comp = {}
for seg_group, seg_cols in segmentations.items():
   df_grouped = df.groupby(seg_cols)[data_cols]
   dd_comp[seg_group] = df_grouped.aggregate( ['mean', 'std', 'min', 'max'])

with ProgressBar():
   segmented_stats = dd.compute(dd_comp)



# Example 2: includes also "kurt" and "skew". But it is painfully slow 
# and generates >150 GB of Temp output before running out of disk space

empty_segment = pd.DataFrame( index=data_cols,
                             columns=['mean', 'three_sigma',
                                      'min', 'max', 'kurt', 'skew']
                           )
def segment_statistics(segment):
   stats = empty_segment.copy()
   for col in data_cols:
       stats.loc[col]['mean'] = np.mean(segment[col])
       stats.loc[col]['std'] = np.std(segment[col])
       stats.loc[col]['min'] = np.min(segment[col])
       stats.loc[col]['max'] = np.max(segment[col])
       stats.loc[col]['skew'] = sps.skew(segment[col])
       stats.loc[col]['kurt'] = sps.kurtosis(segment[col]) + 3
   return stats

dd_comp = {}
for seg_group, seg_cols in segmentations.items():
   df_grouped = df.groupby(seg_cols)[data_cols]
   dd_comp[seg_group] = df_grouped.apply( segment_statistics,
                                          meta=empty_segment )

with ProgressBar():
   segmented_stats = dd.compute(dd_comp)

Upvotes: 2

Views: 646

Answers (1)

MRocklin
MRocklin

Reputation: 57319

It sounds like you might benefit from custom aggregations: https://docs.dask.org/en/latest/dataframe-groupby.html#aggregate

If you're able to come up with some nice implementations for higher order moments those also sound like they would be nice contributions to the project.

Upvotes: 1

Related Questions