Reputation: 610
I have a data set that contains a timeseries per file. I am really happy how dask handles ~1k files on our cluster (one directory in my case). But I have around 50 directories.
The funny thing that happens is, that the building the dask graph seems to consume more memory and CPU than the actual problem. This being only on the scheduler. The following bare minimum code should only create the graph, but seems to do already a lot of pandas stuff on the scheduler:
df=intake.open_csv(TRAIN_PATH+"{folder_name}/{file_name}.csv",csv_kwargs={"dtype": dtypes.to_dict()}).to_dask()
features=df.groupby(['folder_name','file_name']).agg(["min","max"])
Note: I am using intake for the patterns here. I have been also using read_csv
from dask with include_path_column=True
and path
as group. I managed to make the above steps faster, but then features.compute()
seems to expand the graph leading effectively to the same situation, that the scheduler hangs before the cluster starts running.
The easiest would be to actually do use a dask antipattern and do a loop. However I wonder, if that can be done better (it is for educational purposes, so style and simplicity is important)
Is there a nice way to read many files in one graph without making the graph size scale beyond linear.
Upvotes: 1
Views: 355
Reputation: 610
This is a solution I came up with using map_partitions
, that seems to generate a decent graph.
import dask.bag as db
def get_features(file):
data= pd.read_csv(file)
data["path"]=file
return data.groupby("path").agg(["min","max"])
csvs=db.from_sequence(files)
features=csvs.map_partitions(lambda x: [get_features(f) for f in x]).\
reduction(pd.concat,pd.concat).compute()
The solution does not generalize beyond the use case ,though: e.g. if features would span more than one file. It also does not build a dask dataframe: that would be bad, if there are too many groups per file.
Upvotes: 0