Reputation: 93
Question: I want to do a similar operation to
ARRAY_AGG(STRUCT(table))
in beam for python.
Background: Similar to this thread I'm running a beam pipeline in python. I have two tables, one with ids and a sum:
ID | total |
---|---|
1 | 10 |
2 | 15 |
3 | 5 |
And one breakdown table where each row is:
table1_id | item_name | item_price |
---|---|---|
1 | a | 2 |
1 | b | 8 |
2 | c | 5 |
2 | d | 5 |
2 | e | 5 |
3 | f | 7 |
I want the output in biq query to look like:
id | total | item.item_name | item.item_price |
---|---|---|---|
1 | 10 | a | 2 |
b | 8 | ||
2 | 15 | c | 5 |
d | 5 | ||
e | 5 | ||
3 | 5 | f | 7 |
In BQ this is solvable by doing an ARRAY_AGG(SRUCT(line_items))
and grouping by table1_id which can then be joined on table1. Is there a smart way to do so in beam with python?
(Assuming it's something with groupby by haven't been able to get it working)
Upvotes: 0
Views: 218
Reputation: 6572
I propose you a full code to implement your solution in an unit test :
from typing import List, Dict, Tuple, Any
import apache_beam as beam
import pytest
from apache_beam import Create
from apache_beam.pvalue import AsList
from apache_beam.testing.test_pipeline import TestPipeline
def test_pipeline(self):
with TestPipeline() as p:
ids = [
{
'ID': 1,
'total': 10
},
{
'ID': 2,
'total': 15
},
{
'ID': 3,
'total': 5
}
]
items = [
{
'table1_id': 1,
'item_name': 'a',
'item_price': 2
},
{
'table1_id': 1,
'item_name': 'b',
'item_price': 8
},
{
'table1_id': 2,
'item_name': 'c',
'item_price': 5
},
{
'table1_id': 2,
'item_name': 'd',
'item_price': 5
},
{
'table1_id': 2,
'item_name': 'e',
'item_price': 5
},
{
'table1_id': 3,
'item_name': 'f',
'item_price': 7
}
]
ids_side_inputs = p | 'Side input IDs' >> Create(ids)
result = (p
| 'Input items' >> Create(items)
| beam.GroupBy(lambda i: i['table1_id'])
| beam.Map(self.to_item_tuple_with_total, ids=AsList(ids_side_inputs))
| beam.Map(self.to_item_result)
)
result | "Print outputs" >> beam.Map(print)
def to_item_tuple_with_total(self, item_tuple: Tuple[int, Any], ids: List[Dict]) -> Tuple[Dict, List[Dict]]:
table_id = item_tuple[0]
total = next(id_element for id_element in ids if id_element['ID'] == table_id)['total']
return {'id': table_id, 'total': total}, item_tuple[1]
def to_item_result(self, item_tuple: Tuple[Dict, Any]) -> Dict:
item_key = item_tuple[0]
return {'id': item_key['id'], 'total': item_key['total'], 'item': item_tuple[1]}
The result is :
{
'id': 1,
'total': 10,
'item':
[
{'table1_id': 1, 'item_name': 'a', 'item_price': 2},
{'table1_id': 1, 'item_name': 'b', 'item_price': 8}
]
}
{
'id': 2,
'total': 15,
'item':
[
{'table1_id': 2, 'item_name': 'c', 'item_price': 5},
{'table1_id': 2, 'item_name': 'd', 'item_price': 5},
{'table1_id': 2, 'item_name': 'e', 'item_price': 5}
]
}
{
'id': 3,
'total': 5,
'item':
[
{'table1_id': 3, 'item_name': 'f', 'item_price': 7}
]
}
Some explanations :
items
input PCollection
from BigQuery
ids
side input PCollection
from BigQuery
GroupBy
on table1_id
from item
PCollection
Map
with side input list IDs
to link total
to itemsMap
returns a Dict
with expected fields before to save the result to BigQuery
Upvotes: 1