HyperCube
HyperCube

Reputation: 15

Apache Beam | Python | Dataflow - How to join BigQuery' collections with different keys?

I've faced the following problem. I'm trying to use INNER JOIN with two tables from Google BigQuery on Apache Beam (Python) for a specific situation. However, I haven't found a native way to deal with it easily.

This query output I'm going to fill a third table on Google BigQuery, for this situation I really need to query it on Google Dataflow. The first table (client) key is the "id" column, and the second table (purchase) key is the "client_id" column.

1.Tables example (consider 'client_table.id = purchase_table.client_id'):

client_table

| id |    name     | country |
|----|-------------|---------|
| 1  | first user  |   usa   |
| 2  | second user |   usa   |

purchase_table

| id |  client_id  |  value  |
|----|-------------|---------|
| 1  |      1      |   15    |
| 2  |      1      |   120   |
| 3  |      2      |   190   |

2.Code I'm trying to develop (problem in the second line of 'output'):

options = {'project': PROJECT,
           'runner': RUNNER,
           'region': REGION,
           'staging_location': 'gs://bucket/temp',
           'temp_location': 'gs://bucket/temp',
           'template_location': 'gs://bucket/temp/test_join'}
pipeline_options = beam.pipeline.PipelineOptions(flags=[], **options)
pipeline = beam.Pipeline(options = pipeline_options)

query_results_1 = (
 pipeline 
    | 'ReadFromBQ_1' >> beam.io.Read(beam.io.ReadFromBigQuery(query="select id as client_id, name from client_table", use_standard_sql=True)))

query_results_2 = (
 pipeline 
    | 'ReadFromBQ_2' >> beam.io.Read(beam.io.ReadFromBigQuery(query="select * from purchase_table", use_standard_sql=True)))

output = ( {'query_results_1':query_results_1,'query_results_2':query_results_2} 
    | 'join' >> beam.GroupBy('client_id')
    | 'writeToBQ' >> beam.io.WriteToBigQuery(
        table=TABLE,
        dataset=DATASET,
        project=PROJECT,
        schema=SCHEMA,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))

pipeline.run()

3.Equivalent desired output in SQL:

SELECT a.name, b.value * from client_table as a INNER JOIN purchase_table as b on a.id = b.client_id;

Upvotes: 1

Views: 1780

Answers (1)

Iñigo
Iñigo

Reputation: 2680

You could use either a CoGroupByKey or side inputs (as a broadcast join) depending on your key cardinality. If you have a few keys with many elements each, I suggest the broadcast join.

The first thing you'd need to do is to add a key to your PCollections after the BQ read:

kv_1 = query_results_1 | Map(lambda x: (x["id"], x))
kv_2 = query_results_1 | Map(lambda x: (x["client_id"], x))

Then you can just do the CoGBK or broadcast join. As an example (since it would be easier to understand), I am going to use the code from this session of Beam College. Note that in your example the Value of the KV is a dictionary, so you'd need to make some modifications.

Data

jobs = [
    ("John", "Data Scientist"),
    ("Rebecca", "Full Stack Engineer"),
    ("John", "Data Engineer"),
    ("Alice", "CEO"),
    ("Charles", "Web Designer"),
    ("Ruben", "Tech Writer")
]

hobbies = [
    ("John", "Baseball"),
    ("Rebecca", "Football"),
    ("John", "Piano"),
    ("Alice", "Photoshop"),
    ("Charles", "Coding"),
    ("Rebecca", "Acting"),
    ("Rebecca", "Reading")
]

Join with CGBK

def inner_join(element):
  name = element[0]
  jobs = element[1]["jobs"]
  hobbies = element[1]["hobbies"]
  joined =  [{"name": name,
              "job": job,
              "hobbie": hobbie}
             for job in jobs for hobbie in hobbies]
  return joined


jobs_create = p | "Create Jobs" >> Create(jobs)
hobbies_create = p | "Create Hobbies" >> Create(hobbies)

cogbk = {"jobs": jobs_create, "hobbies": hobbies_create} | CoGroupByKey()

join = cogbk | FlatMap(inner_join)

Broadcast join with Side Inputs

def broadcast_inner_join(element, side_input):
  name = element[0]
  job = element[1]
  hobbies = side_input.get(name, [])

  joined =  [{"name": name,
              "job": job,
              "hobbie": hobbie}
             for hobbie in hobbies]
  return joined

hobbies_create = (p | "Create Hobbies" >> Create(hobbies) 
                    | beam.GroupByKey()
                )

jobs_create = p | "Create Jobs" >> Create(jobs)

boardcast_join = jobs_create | FlatMap(broadcast_inner_join, 
                                        side_input=pvalue.AsDict(hobbies_create))

Upvotes: 4

Related Questions