DarioB
DarioB

Reputation: 1609

apache beam how to create a schema-aware PCollection

I am trying to use apache beam to read two datasets, and update the first one if a row matching the ID is present in the second dataset. Here is the beam pipeline:


class MapDataFrameRow(beam.DoFn):

    def process(self, element):
        key = element.id
        yield key, element


class MergeTuples(beam.DoFn):

    def process(self, element):
        key, data = element
        if len(data[1]) > 0:
            yield data[1][0]
        else:
            yield data[0][0]

def run(pipeline_options, state_file, gcs_input_file_path, gcs_output_path):
    with beam.Pipeline(options=pipeline_options) as pipeline:

        current_state_df = pipeline | 'Read current state CSV' >> read_csv(state_file)
        updates_df = pipeline | 'Read data updates CSV' >> read_csv(gcs_input_file_path)

        # Translate beam dataframe to pcollections
        #  MapDataFrameRow is a DoFn that map the ID as the tuple key.
        cs_pc = to_pcollection(current_state_df, include_indexes=False) | "Map #1 to KV" >> beam.ParDo(MapDataFrameRow())
        nu_pc = to_pcollection(updates_df, include_indexes=False) | "Map #2 to KV" >> beam.ParDo(MapDataFrameRow())

        merged = (
                (cs_pc, nu_pc)
                | 'group by key' >> beam.CoGroupByKey()
                | 'keep most recent' >> beam.ParDo(MergeTuples())
                # | 'To rows' >> beam.Map(lambda word: beam.Row(??))
                | beam.ParDo(print)
        )


        df = to_dataframe(merged)
        df.to_csv(gcs_output_path)

the code produce the following output:

BeamSchema_c46d9a9f_a000_453e_87fe_7f4b372c21a1(id=143, area=130.0, .... )
BeamSchema_c46d9a9f_a000_453e_87fe_7f4b372c21a1(id=144, area=130.0, ... )
BeamSchema_c46d9a9f_a000_453e_87fe_7f4b372c21a1(id=145, area=130.0, ... )
...

Unfortunately i am not able to map the merged pcollection into a dataframe, i get the following error:

TypeError: Could not determine schema for type hint Any. Did you mean to create a schema-aware PCollection? See https://s.apache.org/beam-python-schemas

I have tried using | 'To rows' >> beam.Map(lambda word: beam.Row(??)) (commented in the code) but I don't want to manually specify all the key values, is there a way to easily map the BeamSchema to a dataframe?

EDIT: It appears that the problem is that to_dataframe(merged) doesn't like to work with two different BeamSchema_c46d9a9f_xxxx and BeamSchema_3369b0_xxxx generated by he two to_pcollections. In other words, I am creating two separate pcollections and then I am picking from one source or the other and then trying to create a new dataframe, but because they appear to beam as separate schemas he doesn't like it. What would be a way to merge the two dataframes?

Upvotes: 1

Views: 3055

Answers (1)

chamikara
chamikara

Reputation: 2024

First of all, I see that you applied the transform beam.ParDo(print) to produce the PCollection merged. This means that the PCollection you are trying to convert will contain the objects returned by this function. Are you using a custom print function that return the original object after printing ?

If above does not resolve the issue, you have to make sure that Beam Python can infer a Schema from the type of objects the PCollection represents. For custom types, this can be done by converting the objects to NamedTuples. See here for more details.

This should allow to_dataframe to properly discover the Schema for type of objects the PCollection contains.

Upvotes: 1

Related Questions