Reputation: 1257
Based on the answer I had received on an earlier question, I have written an ETL procedure that looks as follows:
import pandas as pd
from dask import delayed
from dask import dataframe as dd
def preprocess_files(filename):
"""Reads file, collects metadata and identifies lines not containing data.
"""
...
return filename, metadata, skiprows
def load_file(filename, skiprows):
"""Loads the file into a pandas dataframe, skipping lines not containing data."""
return df
def process_errors(filename, skiplines):
"""Calculates error metrics based on the information
collected in the pre-processing step
"""
...
def process_metadata(filename, metadata):
"""Analyses metadata collected in the pre-processing step."""
...
values = [delayed(preprocess_files)(fn) for fn in file_names]
filenames = [value[0] for value in values]
metadata = [value[1] for value in values]
skiprows = [value[2] for value in values]
error_results = [delayed(process_errors)(arg[0], arg[1])
for arg in zip(filenames, skiprows)]
meta_results = [delayed(process_metadata)(arg[0], arg[1])
for arg in zip(filenames, metadata)]
dfs = [delayed(load_file)(arg[0], arg[1])
for arg in zip(filenames, skiprows)]
... # several delayed transformations defined on individual dataframes
# finally: categorize several dataframe columns and write them to HDF5
dfs = dd.from_delayed(dfs, meta=metaframe)
dfs.categorize(columns=[...]) # I would like to delay this
dfs.to_hdf(hdf_file_name, '/data',...) # I would also like to delay this
all_operations = error_results + meta_results # + delayed operations on dask dataframe
# trigger all computation at once,
# allow re-using of data collected in the pre-processing step.
dask.compute(*all_operations)
The ETL-process goes through several steps:
process_metadata
, process_errors
, load_file
) have a shared data dependency in that they all use information gathered in the pre-processing step. Ideally, the pre-processing step would only be run once and the results shared across processes. The problem I am having with this is, that categorize
and to_hdf
trigger computation immediately, discarding metadata and error-data which otherwise would be further processed by process_errors
and process_metadata
.
I have been told that delaying operations on dask.dataframes
can cause problems, which is why I would be very interested to know whether it is possible to trigger the entire computation (processing metadata, processing errors, loading dataframes, transforming dataframes and storing them in HDF format) at once, allowing the different processes to share the data collected in the pre-processing phase.
Upvotes: 3
Views: 1003
Reputation: 57251
There are two ways to approach your problem:
The to_hdf call accepts a compute=
keyword argument that you can set to False. If False it will hand you back a dask.delayed
value that you can compute whenever you feel like it.
The categorize call however does need to be computed immediately if you want to keep using dask.dataframe. We're unable to create a consistent dask.dataframe without going through the data more-or-less immediately. Recent improvements in Pandas around unioning categoricals will let us change this in the future, but for now you're stuck. If this is a blocker for you then you'll have to switch down to dask.delayed
and handle things manually for a bit with df.to_delayed()
If you use the distributed scheduler you can stage your computation by using the .persist
method.
from dask.distributed import Executor
e = Executor() # make a local "cluster" on your laptop
delayed_values = e.persist(*delayed_values)
... define further computations on delayed values ...
results = dask.compute(results) # compute as normal
This will let you trigger some computations and still let you proceed onwards defining your computation. The values that you persist will stay in memory.
Upvotes: 4