zukah
zukah

Reputation: 46

Using Dask to parallelize HDF read-translate-write

TL;DR: We're having issues parallelizing Pandas code with Dask that reads and writes from the same HDF

I'm working on a project that generally requires three steps: reading, translating (or combining data), and writing these data. For context, we're working with medical records, where we receive claims in different formats, translate them into a standardized format, then re-write them to disk. Ideally, I'm hoping to save intermediate datasets in some form that I can access via Python/Pandas later.

Currently, I've chosen HDF as my data storage format, however I'm having trouble with runtime issues. On a large population, my code currently can take upwards of a few days. This has led me to investigate Dask, but I'm not positive I've applied Dask best to my situation.

What follows is a working example of my workflow, hopefully with enough sample data to get a sense of runtime issues.

Read (in this case Create) data

import pandas as pd
import numpy as np
import dask
from dask import delayed
from dask import dataframe as dd
import random
from datetime import timedelta
from pandas.io.pytables import HDFStore

member_id = range(1, 10000)
window_start_date = pd.to_datetime('2015-01-01')
start_date_col = [window_start_date + timedelta(days=random.randint(0, 730)) for i in member_id]

# Eligibility records
eligibility = pd.DataFrame({'member_id': member_id,
                            'start_date': start_date_col})
eligibility['end_date'] = eligibility['start_date'] + timedelta(days=365)
eligibility['insurance_type'] = np.random.choice(['HMO', 'PPO'], len(member_id), p=[0.4, 0.6])
eligibility['gender'] = np.random.choice(['F', 'M'], len(member_id), p=[0.6, 0.4])
(eligibility.set_index('member_id')
 .to_hdf('test_data.h5',
         key='eligibility',
         format='table'))

# Inpatient records
inpatient_record_number = range(1, 20000)
service_date = [window_start_date + timedelta(days=random.randint(0, 730)) for i in inpatient_record_number]
inpatient = pd.DataFrame({'inpatient_record_number': inpatient_record_number,
                          'service_date': service_date})
inpatient['member_id'] = np.random.choice(list(range(1, 10000)), len(inpatient_record_number))
inpatient['procedure'] = np.random.choice(['A', 'B', 'C', 'D'], len(inpatient_record_number))
(inpatient.set_index('member_id')
 .to_hdf('test_data.h5',
         key='inpatient',
         format='table'))

# Outpatient records
outpatient_record_number = range(1, 30000)
service_date = [window_start_date + timedelta(days=random.randint(0, 730)) for i in outpatient_record_number]
outpatient = pd.DataFrame({'outpatient_record_number': outpatient_record_number,
                           'service_date': service_date})
outpatient['member_id'] = np.random.choice(range(1, 10000), len(outpatient_record_number))
outpatient['procedure'] = np.random.choice(['A', 'B', 'C', 'D'], len(outpatient_record_number))
(outpatient.set_index('member_id')
 .to_hdf('test_data.h5',
         key='outpatient',
         format='table'))

Translate/Write data

Sequential approach

def pull_member_data(member_i):
    inpatient_slice = pd.read_hdf('test_data.h5', 'inpatient', where='index == "{}"'.format(member_i))
    outpatient_slice = pd.read_hdf('test_data.h5', 'outpatient', where='index == "{}"'.format(member_i))
    return inpatient_slice, outpatient_slice


def create_visits(inpatient_slice, outpatient_slice):
    # In reality this is more complicated, using some logic to combine inpatient/outpatient/ER into medical 'visits'
    # But for simplicity, we'll just stack the inpatient/outpatient and assign a record identifier
    visits_stacked = pd.concat([inpatient_slice, outpatient_slice]).reset_index().sort_values('service_date')
    visits_stacked.insert(0, 'visit_id', range(1, len(visits_stacked) + 1))
    return visits_stacked


def save_visits_to_hdf(visits_slice):
    with HDFStore('test_data.h5', mode='a') as store:
        store.append('visits', visits_slice)


# Read in the data by member_id, perform some operation
def translate_by_member(member_i):
    inpatient_slice, outpatient_slice = pull_member_data(member_i)
    visits_slice = create_visits(inpatient_slice, outpatient_slice)
    save_visits_to_hdf(visits_slice)


def run_translate_sequential():
    # Simple approach: Loop through each member sequentially
    for member_i in member_id:
        translate_by_member(member_i)

run_translate_sequential()

The above code takes ~9 minutes to run on my machine.

Dask approach

def create_visits_dask_version(visits_stacked):
    # In reality this is more complicated, using some logic to combine inpatient/outpatient/ER
    # But for simplicity, we'll just stack the inpatient/outpatient and assign a record identifier
    len_of_visits = visits_stacked.shape[0]
    visits_stacked_1 = (visits_stacked
                        .sort_values('service_date')
                        .assign(visit_id=range(1, len_of_visits + 1))
                        .set_index('visit_id')
                        )
    return visits_stacked_1


def run_translate_dask():
    # Approach 2: Dask, with individual writes to HDF
    inpatient_dask = dd.read_hdf('test_data.h5', 'inpatient')
    outpatient_dask = dd.read_hdf('test_data.h5', 'outpatient')
    stacked = dd.concat([inpatient_dask, outpatient_dask])
    visits = stacked.groupby('member_id').apply(create_visits_dask_version)
    visits.to_hdf('test_data_dask.h5', 'visits')

run_translate_dask()

This Dask approach takes 13 seconds(!)

While this is a great improvement, we're generally curious about a few things:

  1. Given this simple example, is the approach of using Dask dataframes, concatenating them, and using groupby/apply the best approach?

  2. In reality, we have multiple processes like this that read from the same HDF, and write to the same HDF. Our original codebase was structured in a manner that allowed for running the entire workflow one member_id at a time. When we tried to parallelize them, it sometimes worked on small samples, but most of the time produced a segmentation fault. Are there known issues with parallelizing workflows like this, reading/writing with HDFs? We're working on producing an example of this as well, but figured we'd post this here in case this triggers suggestions (or if this code helps someone facing a similar problem).

Any and all feedback appreciated!

Upvotes: 1

Views: 1139

Answers (1)

MRocklin
MRocklin

Reputation: 57271

In general groupby-apply will be fairly slow. It is generally challenging to resort data like this, especially in limited memory.

In general I recommend using the Parquet format (dask.dataframe has to_ and read_parquet functions). You are much less likely to get segfaults than with HDF files.

Upvotes: 1

Related Questions