Reputation: 1609
I am trying to use apache beam to read two datasets, and update the first one if a row matching the ID is present in the second dataset. Here is the beam pipeline:
class MapDataFrameRow(beam.DoFn):
def process(self, element):
key = element.id
yield key, element
class MergeTuples(beam.DoFn):
def process(self, element):
key, data = element
if len(data[1]) > 0:
yield data[1][0]
else:
yield data[0][0]
def run(pipeline_options, state_file, gcs_input_file_path, gcs_output_path):
with beam.Pipeline(options=pipeline_options) as pipeline:
current_state_df = pipeline | 'Read current state CSV' >> read_csv(state_file)
updates_df = pipeline | 'Read data updates CSV' >> read_csv(gcs_input_file_path)
# Translate beam dataframe to pcollections
# MapDataFrameRow is a DoFn that map the ID as the tuple key.
cs_pc = to_pcollection(current_state_df, include_indexes=False) | "Map #1 to KV" >> beam.ParDo(MapDataFrameRow())
nu_pc = to_pcollection(updates_df, include_indexes=False) | "Map #2 to KV" >> beam.ParDo(MapDataFrameRow())
merged = (
(cs_pc, nu_pc)
| 'group by key' >> beam.CoGroupByKey()
| 'keep most recent' >> beam.ParDo(MergeTuples())
# | 'To rows' >> beam.Map(lambda word: beam.Row(??))
| beam.ParDo(print)
)
df = to_dataframe(merged)
df.to_csv(gcs_output_path)
the code produce the following output:
BeamSchema_c46d9a9f_a000_453e_87fe_7f4b372c21a1(id=143, area=130.0, .... )
BeamSchema_c46d9a9f_a000_453e_87fe_7f4b372c21a1(id=144, area=130.0, ... )
BeamSchema_c46d9a9f_a000_453e_87fe_7f4b372c21a1(id=145, area=130.0, ... )
...
Unfortunately i am not able to map the merged pcollection into a dataframe, i get the following error:
TypeError: Could not determine schema for type hint Any. Did you mean to create a schema-aware PCollection? See https://s.apache.org/beam-python-schemas
I have tried using | 'To rows' >> beam.Map(lambda word: beam.Row(??))
(commented in the code) but I don't want to manually specify all the key values, is there a way to easily map the BeamSchema
to a dataframe?
EDIT:
It appears that the problem is that to_dataframe(merged)
doesn't like to work with two different BeamSchema_c46d9a9f_xxxx
and BeamSchema_3369b0_xxxx
generated by he two to_pcollection
s. In other words, I am creating two separate pcollections and then I am picking from one source or the other and then trying to create a new dataframe, but because they appear to beam as separate schemas he doesn't like it. What would be a way to merge the two dataframes?
Upvotes: 1
Views: 3055
Reputation: 2024
First of all, I see that you applied the transform beam.ParDo(print)
to produce the PCollection
merged
. This means that the PCollection
you are trying to convert will contain the objects returned by this function. Are you using a custom print
function that return the original object after printing ?
If above does not resolve the issue, you have to make sure that Beam Python can infer a Schema from the type of objects the PCollection
represents. For custom types, this can be done by converting the objects to NamedTuple
s. See here for more details.
This should allow to_dataframe
to properly discover the Schema for type of objects the PCollection
contains.
Upvotes: 1