Reputation: 15
I've faced the following problem. I'm trying to use INNER JOIN with two tables from Google BigQuery on Apache Beam (Python) for a specific situation. However, I haven't found a native way to deal with it easily.
This query output I'm going to fill a third table on Google BigQuery, for this situation I really need to query it on Google Dataflow. The first table (client) key is the "id" column, and the second table (purchase) key is the "client_id" column.
1.Tables example (consider 'client_table.id = purchase_table.client_id'):
client_table
| id | name | country |
|----|-------------|---------|
| 1 | first user | usa |
| 2 | second user | usa |
purchase_table
| id | client_id | value |
|----|-------------|---------|
| 1 | 1 | 15 |
| 2 | 1 | 120 |
| 3 | 2 | 190 |
2.Code I'm trying to develop (problem in the second line of 'output'):
options = {'project': PROJECT,
'runner': RUNNER,
'region': REGION,
'staging_location': 'gs://bucket/temp',
'temp_location': 'gs://bucket/temp',
'template_location': 'gs://bucket/temp/test_join'}
pipeline_options = beam.pipeline.PipelineOptions(flags=[], **options)
pipeline = beam.Pipeline(options = pipeline_options)
query_results_1 = (
pipeline
| 'ReadFromBQ_1' >> beam.io.Read(beam.io.ReadFromBigQuery(query="select id as client_id, name from client_table", use_standard_sql=True)))
query_results_2 = (
pipeline
| 'ReadFromBQ_2' >> beam.io.Read(beam.io.ReadFromBigQuery(query="select * from purchase_table", use_standard_sql=True)))
output = ( {'query_results_1':query_results_1,'query_results_2':query_results_2}
| 'join' >> beam.GroupBy('client_id')
| 'writeToBQ' >> beam.io.WriteToBigQuery(
table=TABLE,
dataset=DATASET,
project=PROJECT,
schema=SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
pipeline.run()
3.Equivalent desired output in SQL:
SELECT a.name, b.value * from client_table as a INNER JOIN purchase_table as b on a.id = b.client_id;
Upvotes: 1
Views: 1780
Reputation: 2680
You could use either a CoGroupByKey
or side inputs (as a broadcast join) depending on your key cardinality. If you have a few keys with many elements each, I suggest the broadcast join.
The first thing you'd need to do is to add a key to your PCollections after the BQ read:
kv_1 = query_results_1 | Map(lambda x: (x["id"], x))
kv_2 = query_results_1 | Map(lambda x: (x["client_id"], x))
Then you can just do the CoGBK or broadcast join. As an example (since it would be easier to understand), I am going to use the code from this session of Beam College. Note that in your example the Value of the KV is a dictionary, so you'd need to make some modifications.
Data
jobs = [
("John", "Data Scientist"),
("Rebecca", "Full Stack Engineer"),
("John", "Data Engineer"),
("Alice", "CEO"),
("Charles", "Web Designer"),
("Ruben", "Tech Writer")
]
hobbies = [
("John", "Baseball"),
("Rebecca", "Football"),
("John", "Piano"),
("Alice", "Photoshop"),
("Charles", "Coding"),
("Rebecca", "Acting"),
("Rebecca", "Reading")
]
Join with CGBK
def inner_join(element):
name = element[0]
jobs = element[1]["jobs"]
hobbies = element[1]["hobbies"]
joined = [{"name": name,
"job": job,
"hobbie": hobbie}
for job in jobs for hobbie in hobbies]
return joined
jobs_create = p | "Create Jobs" >> Create(jobs)
hobbies_create = p | "Create Hobbies" >> Create(hobbies)
cogbk = {"jobs": jobs_create, "hobbies": hobbies_create} | CoGroupByKey()
join = cogbk | FlatMap(inner_join)
Broadcast join with Side Inputs
def broadcast_inner_join(element, side_input):
name = element[0]
job = element[1]
hobbies = side_input.get(name, [])
joined = [{"name": name,
"job": job,
"hobbie": hobbie}
for hobbie in hobbies]
return joined
hobbies_create = (p | "Create Hobbies" >> Create(hobbies)
| beam.GroupByKey()
)
jobs_create = p | "Create Jobs" >> Create(jobs)
boardcast_join = jobs_create | FlatMap(broadcast_inner_join,
side_input=pvalue.AsDict(hobbies_create))
Upvotes: 4