N asks
N asks

Reputation: 11

Can I join two tables in Dataflow (Apache Beam) on multiple keys (join condition)?

I have two tables like:

ID Name  Age
1  Alex  20
2  Sarah 21 and so on
.....................
ID Name  Marks
1  Alex  80
2  Sarah 78 and so on
.....................

I want to join these two tables using Cloud Dataflow (Apache Beam) on more than one key (joining condition) i. e. ID and Name both are the common columns. How can I do so?

I have tried joining it using one key (one common column) but I don't know how to use more than one key

I have used this code as a reference:

https://github.com/GoogleCloudPlatform/professional-services/blob/master/examples/dataflow-python-examples/dataflow_python_examples/data_lake_to_mart.py

class JoinTables:
    def add_key_details(self, row, key_details):
        result = row.copy()
        try:
            result.update(key_details[row['name']])
        except KeyError as err:
            traceback.print_exc()
            logging.error("Name Not Found error: %s", err)
        return result

def run(argv=None):
    jointables = JoinTables()

    table1= (p 
        | 'Read table1 details from BigQuery ' >> beam.io.Read(
             beam.io.BigQuerySource(
                  query='SELECT * FROM `dataset.table1`',
                  use_standard_sql=True
             )
        )
        | 'Key Details 1' >> beam.Map(lambda row: (row['name'], row))
    )

    table2 = (p 
        | 'Read table2 details from BigQuery ' >> beam.io.Read(
            beam.io.BigQuerySource(
                query='SELECT * FROM `dataset.table2`',
                use_standard_sql=True
            )
        )
        | 'Join data with side input 1' >> beam.Map(jointables.add_key_details, AsDict(table1))
    )

Upvotes: 1

Views: 2853

Answers (2)

Rafaël
Rafaël

Reputation: 1027

TLDR : You need to map table1 with a tuple key (ID, name) and then access the row using these two values.

# Map using tuple
| 'Key Details 1' >> beam.Map(lambda row: ((row['id'], row['name']), row))

# Access using tuple
result.update(key_details[(row['id'], row['name'])])

Explanation :

Joining here is basically :

  1. Convert table1 as a KV pair where K is a field and V is the row
beam.Map(lambda row: (row['name'], row))
  1. Passing table1 as a side input as a dictionary
beam.Map(jointables.add_key_details, AsDict(table1))
  1. For each row of table2, get table1 equivalent using the same key and update table2 row
result.update(key_details[row['name']])
  1. Return the new row with new fields.

So here, the field you use in step 1 and 3 is 'name'. If you want to use something else, just call something else than name (ex: row['id']). The trick to get multiple fields is to use a tuple as the key. This way, just map your rows on (row['id'], row['name']) and use it in add_key_details to access the correct table1 row.

Hope this helps !

Upvotes: 2

Pavan Kumar Kattamuri
Pavan Kumar Kattamuri

Reputation: 335

Yes, you can do that. Once you have two PCollections from two BigQuery sources, you need to do some pre-processing, use CoGroupByKey and then unnest the records as answered by Rafael. If you need to do similar kind of joins at multiple places, you can also use the composite transform to abstract all these and just pass the PCollections, column names you want to join on. This is well described here in this blog.

Upvotes: 1

Related Questions