Reputation: 11
I have two tables like:
ID Name Age
1 Alex 20
2 Sarah 21 and so on
.....................
ID Name Marks
1 Alex 80
2 Sarah 78 and so on
.....................
I want to join these two tables using Cloud Dataflow (Apache Beam) on more than one key (joining condition) i. e. ID and Name both are the common columns. How can I do so?
I have tried joining it using one key (one common column) but I don't know how to use more than one key
I have used this code as a reference:
class JoinTables:
def add_key_details(self, row, key_details):
result = row.copy()
try:
result.update(key_details[row['name']])
except KeyError as err:
traceback.print_exc()
logging.error("Name Not Found error: %s", err)
return result
def run(argv=None):
jointables = JoinTables()
table1= (p
| 'Read table1 details from BigQuery ' >> beam.io.Read(
beam.io.BigQuerySource(
query='SELECT * FROM `dataset.table1`',
use_standard_sql=True
)
)
| 'Key Details 1' >> beam.Map(lambda row: (row['name'], row))
)
table2 = (p
| 'Read table2 details from BigQuery ' >> beam.io.Read(
beam.io.BigQuerySource(
query='SELECT * FROM `dataset.table2`',
use_standard_sql=True
)
)
| 'Join data with side input 1' >> beam.Map(jointables.add_key_details, AsDict(table1))
)
Upvotes: 1
Views: 2853
Reputation: 1027
TLDR : You need to map table1
with a tuple key (ID, name)
and then access the row using these two values.
# Map using tuple
| 'Key Details 1' >> beam.Map(lambda row: ((row['id'], row['name']), row))
# Access using tuple
result.update(key_details[(row['id'], row['name'])])
Explanation :
Joining here is basically :
beam.Map(lambda row: (row['name'], row))
beam.Map(jointables.add_key_details, AsDict(table1))
result.update(key_details[row['name']])
So here, the field you use in step 1 and 3 is 'name'. If you want to use something else, just call something else than name (ex: row['id']
). The trick to get multiple fields is to use a tuple as the key. This way, just map your rows on (row['id'], row['name'])
and use it in add_key_details
to access the correct table1
row.
Hope this helps !
Upvotes: 2
Reputation: 335
Yes, you can do that. Once you have two PCollections from two BigQuery sources, you need to do some pre-processing, use CoGroupByKey and then unnest the records as answered by Rafael. If you need to do similar kind of joins at multiple places, you can also use the composite transform to abstract all these and just pass the PCollections, column names you want to join on. This is well described here in this blog.
Upvotes: 1