sim
sim

Reputation: 1257

Re-using intermediate results in Dask (mixing delayed and dask.dataframe)

Based on the answer I had received on an earlier question, I have written an ETL procedure that looks as follows:

import pandas as pd
from dask import delayed
from dask import dataframe as dd

def preprocess_files(filename):
    """Reads file, collects metadata and identifies lines not containing data.
    """
    ...
    return filename, metadata, skiprows

def load_file(filename, skiprows):
    """Loads the file into a pandas dataframe, skipping lines not containing data."""
    return df

def process_errors(filename, skiplines):
    """Calculates error metrics based on the information 
    collected in the pre-processing step
    """
    ...

def process_metadata(filename, metadata):
    """Analyses metadata collected in the pre-processing step."""
    ...

values = [delayed(preprocess_files)(fn) for fn in file_names]
filenames = [value[0] for value in values]
metadata = [value[1] for value in values]
skiprows = [value[2] for value in values]

error_results = [delayed(process_errors)(arg[0], arg[1]) 
                 for arg in zip(filenames, skiprows)]
meta_results = [delayed(process_metadata)(arg[0], arg[1]) 
                for arg in zip(filenames, metadata)]

dfs = [delayed(load_file)(arg[0], arg[1]) 
       for arg in zip(filenames, skiprows)]
... # several delayed transformations defined on individual dataframes

# finally: categorize several dataframe columns and write them to HDF5
dfs = dd.from_delayed(dfs, meta=metaframe)
dfs.categorize(columns=[...])  # I would like to delay this
dfs.to_hdf(hdf_file_name, '/data',...)  # I would also like to delay this

all_operations = error_results + meta_results # + delayed operations on dask dataframe
# trigger all computation at once, 
# allow re-using of data collected in the pre-processing step.
dask.compute(*all_operations)

The ETL-process goes through several steps:

  1. Pre-process the files, identify lines which do not include any relevant data and parse metadata
  2. Using information gathered, process error information, metadata and load data-lines into pandas dataframes in parallel (re-using the results from the pre-processing step). The operations (process_metadata, process_errors, load_file) have a shared data dependency in that they all use information gathered in the pre-processing step. Ideally, the pre-processing step would only be run once and the results shared across processes.
  3. eventually, collect the pandas dataframes into a dask dataframe, categorize them and write them to hdf.

The problem I am having with this is, that categorize and to_hdf trigger computation immediately, discarding metadata and error-data which otherwise would be further processed by process_errors and process_metadata.

I have been told that delaying operations on dask.dataframes can cause problems, which is why I would be very interested to know whether it is possible to trigger the entire computation (processing metadata, processing errors, loading dataframes, transforming dataframes and storing them in HDF format) at once, allowing the different processes to share the data collected in the pre-processing phase.

Upvotes: 3

Views: 1003

Answers (1)

MRocklin
MRocklin

Reputation: 57251

There are two ways to approach your problem:

  1. Delay everything
  2. Compute in stages

Delay Everything

The to_hdf call accepts a compute= keyword argument that you can set to False. If False it will hand you back a dask.delayed value that you can compute whenever you feel like it.

The categorize call however does need to be computed immediately if you want to keep using dask.dataframe. We're unable to create a consistent dask.dataframe without going through the data more-or-less immediately. Recent improvements in Pandas around unioning categoricals will let us change this in the future, but for now you're stuck. If this is a blocker for you then you'll have to switch down to dask.delayed and handle things manually for a bit with df.to_delayed()

Compute in Stages

If you use the distributed scheduler you can stage your computation by using the .persist method.

from dask.distributed import Executor
e = Executor()  # make a local "cluster" on your laptop

delayed_values = e.persist(*delayed_values)

... define further computations on delayed values ...

results = dask.compute(results)  # compute as normal

This will let you trigger some computations and still let you proceed onwards defining your computation. The values that you persist will stay in memory.

Upvotes: 4

Related Questions