PANDA Stack
PANDA Stack

Reputation: 1363

Splitting file processing by initial keys

Use Case

I have some terabytes of US property data to merge. It is spread across two distinct file formats and thousands of files. The source data is split geographically.

I can't find a way to branch a single pipeline into many independent processing flows.

This is especially difficult because the Dataframe API doesn't seem to support a PTransform on a collection of filenames.

Detailed Background

The distribution of files is like this:

The ideal pipeline would split into thousands of independent processing steps and complete in minutes.

The directory structure is like this:

๐Ÿ“‚state-data/
 |-๐Ÿ“œAL.zip
 |-๐Ÿ“œAK.zip
 |-๐Ÿ“œ...
 |-๐Ÿ“œWY.zip
๐Ÿ“‚county-data/
 |-๐Ÿ“‚AL/
   |-๐Ÿ“œCOUNTY1.csv
   |-๐Ÿ“œCOUNTY2.csv
   |-๐Ÿ“œ...
   |-๐Ÿ“œCOUNTY68.csv
 |-๐Ÿ“‚AK/
   |-๐Ÿ“œ...
 |-๐Ÿ“‚.../
 |-๐Ÿ“‚WY/
   |-๐Ÿ“œ...

Sample Data

This is extremely abbreviated, but imagine something like this:

State Level Data - 51 of these (~200 cols wide)

uid census_plot flood_zone
abc121 ACVB-1249575 R50
abc122 ACVB-1249575 R50
abc123 ACVB-1249575 R51
abc124 ACVB-1249599 R51
abc125 ACVB-1249599 R50
... ... ...

County Level Data - thousands of these (~300 cols wide)

uid county subdivision tax_id
abc121 04021 Roland Heights 3t4g
abc122 04021 Roland Heights 3g444
abc123 04021 Roland Heights 09udd
... ... ... ...

So we join many county-level to a single state level, and thus have an aggregated, more-complete state-level data set.

Then we aggregate all the states, and we have a national level data set.

Desired Outcome

I can successfully merge one state at a time (many county to one state). I built a pipeline to do that, but the pipeline starts with a single CountyData CSV and a single StateData CSV. The issue is getting to the point where I can load the CountyData and StateData.

In other words:

#
# I need to find a way to generalize this flow to
# dynamically created COUNTY and STATE variables.
#

from apache_beam.dataframe.convert import to_pcollection
from apache_beam.dataframe.io import read_csv

COUNTY = "county-data/AL/*.csv"
STATE = "state-data/AL.zip"

def key_by_uid(elem):
    return (elem.uid, elem)

with beam.Pipeline() as p:
    county_df = p | read_csv(COUNTY)
    county_rows_keyed = to_pcollection(county_df) | beam.Map(key_by_uid)

    state_df = pd.read_csv(STATE, compression="zip")
    state_rows_keys = to_pcollection(state_df, pipeline=p) | beam.Map(key_by_uid)

    merged = ({ "state": state_rows_keys, "county": county_rows_keyed } ) | beam.CoGroupByKey() | beam.Map(merge_logic)

    merged | WriteToParquet()
  1. Starting with a list of states
  2. By state, generate filepatterns to the source data
  3. By state, load and merge the filenames
  4. Flatten the output from each state into a US data set.
  5. Write to Parquet file.
with beam.Pipeline(options=pipeline_options) as p:

    merged_data = (
        p
        | beam.Create(cx.STATES)
        | "PathsKeyedByState" >> tx.PathsKeyedByState()
        # ('AL', {'county-data': 'gs://data/county-data/AL/COUNTY*.csv', 'state-data': 'gs://data/state-data/AL.zip'})
        | "MergeSourceDataByState" >> tx.MergeSourceDataByState()
        | "MergeAllStateData" >> beam.Flatten()
    )

    merged_data | "WriteParquet" >> tx.WriteParquet()

The issue I'm having is something like this:

Upvotes: 0

Views: 115

Answers (2)

robertwb
robertwb

Reputation: 5104

Here's an alternative answer.

Read the state data one at a time and flatten them, e.g.

state_dataframe = None
for state in STATES:
  df = p | read_csv('/path/to/state')
  df['state'] = state
  if state_dataframe is None:
    state_dataframe = df
  else:
    state_dataframe = state_dataframe.append(df)

Similarly for county data. Now join them using dataframe operations.

Upvotes: 1

robertwb
robertwb

Reputation: 5104

I'm not sure exactly what kind of merging you're doing here, but one way to structure this pipeline might be to have a DoFn that takes the county data in as a filename as an input element (i.e. you'd have a PCollection of county data filenames), opens it up using "normal" Python (e.g. pandas), and then reads the relevant state data in as a side input to do the merge.

Upvotes: 0

Related Questions