Reputation: 1363
I have some terabytes of US property data to merge. It is spread across two distinct file formats and thousands of files. The source data is split geographically.
I can't find a way to branch a single pipeline into many independent processing flows.
This is especially difficult because the Dataframe
API doesn't seem to support a PTransform
on a collection of filenames.
The distribution of files is like this:
The ideal pipeline would split into thousands of independent processing steps and complete in minutes.
The directory structure is like this:
๐state-data/
|-๐AL.zip
|-๐AK.zip
|-๐...
|-๐WY.zip
๐county-data/
|-๐AL/
|-๐COUNTY1.csv
|-๐COUNTY2.csv
|-๐...
|-๐COUNTY68.csv
|-๐AK/
|-๐...
|-๐.../
|-๐WY/
|-๐...
This is extremely abbreviated, but imagine something like this:
State Level Data - 51 of these (~200 cols wide)
uid | census_plot | flood_zone |
---|---|---|
abc121 | ACVB-1249575 | R50 |
abc122 | ACVB-1249575 | R50 |
abc123 | ACVB-1249575 | R51 |
abc124 | ACVB-1249599 | R51 |
abc125 | ACVB-1249599 | R50 |
... | ... | ... |
County Level Data - thousands of these (~300 cols wide)
uid | county | subdivision | tax_id |
---|---|---|---|
abc121 | 04021 | Roland Heights | 3t4g |
abc122 | 04021 | Roland Heights | 3g444 |
abc123 | 04021 | Roland Heights | 09udd |
... | ... | ... | ... |
So we join many county-level to a single state level, and thus have an aggregated, more-complete state-level data set.
Then we aggregate all the states, and we have a national level data set.
I can successfully merge one state at a time (many county to one state). I built a pipeline to do that, but the pipeline starts with a single CountyData CSV and a single StateData CSV. The issue is getting to the point where I can load the CountyData and StateData.
In other words:
#
# I need to find a way to generalize this flow to
# dynamically created COUNTY and STATE variables.
#
from apache_beam.dataframe.convert import to_pcollection
from apache_beam.dataframe.io import read_csv
COUNTY = "county-data/AL/*.csv"
STATE = "state-data/AL.zip"
def key_by_uid(elem):
return (elem.uid, elem)
with beam.Pipeline() as p:
county_df = p | read_csv(COUNTY)
county_rows_keyed = to_pcollection(county_df) | beam.Map(key_by_uid)
state_df = pd.read_csv(STATE, compression="zip")
state_rows_keys = to_pcollection(state_df, pipeline=p) | beam.Map(key_by_uid)
merged = ({ "state": state_rows_keys, "county": county_rows_keyed } ) | beam.CoGroupByKey() | beam.Map(merge_logic)
merged | WriteToParquet()
with beam.Pipeline(options=pipeline_options) as p:
merged_data = (
p
| beam.Create(cx.STATES)
| "PathsKeyedByState" >> tx.PathsKeyedByState()
# ('AL', {'county-data': 'gs://data/county-data/AL/COUNTY*.csv', 'state-data': 'gs://data/state-data/AL.zip'})
| "MergeSourceDataByState" >> tx.MergeSourceDataByState()
| "MergeAllStateData" >> beam.Flatten()
)
merged_data | "WriteParquet" >> tx.WriteParquet()
The issue I'm having is something like this:
DoFn
to get at the element.Pipeline
, which is a PTransform
. Ex: df = p | read_csv(...)
Upvotes: 0
Views: 115
Reputation: 5104
Here's an alternative answer.
Read the state data one at a time and flatten them, e.g.
state_dataframe = None
for state in STATES:
df = p | read_csv('/path/to/state')
df['state'] = state
if state_dataframe is None:
state_dataframe = df
else:
state_dataframe = state_dataframe.append(df)
Similarly for county data. Now join them using dataframe operations.
Upvotes: 1
Reputation: 5104
I'm not sure exactly what kind of merging you're doing here, but one way to structure this pipeline might be to have a DoFn that takes the county data in as a filename as an input element (i.e. you'd have a PCollection of county data filenames), opens it up using "normal" Python (e.g. pandas), and then reads the relevant state data in as a side input to do the merge.
Upvotes: 0