Jh123
Jh123

Reputation: 93

Nested structures in beam

Question: I want to do a similar operation to ARRAY_AGG(STRUCT(table)) in beam for python.

Background: Similar to this thread I'm running a beam pipeline in python. I have two tables, one with ids and a sum:

ID total
1 10
2 15
3 5

And one breakdown table where each row is:

table1_id item_name item_price
1 a 2
1 b 8
2 c 5
2 d 5
2 e 5
3 f 7

I want the output in biq query to look like:

id total item.item_name item.item_price
1 10 a 2
b 8
2 15 c 5
d 5
e 5
3 5 f 7

In BQ this is solvable by doing an ARRAY_AGG(SRUCT(line_items)) and grouping by table1_id which can then be joined on table1. Is there a smart way to do so in beam with python?

(Assuming it's something with groupby by haven't been able to get it working)

Upvotes: 0

Views: 218

Answers (1)

Mazlum Tosun
Mazlum Tosun

Reputation: 6572

I propose you a full code to implement your solution in an unit test :

from typing import List, Dict, Tuple, Any

import apache_beam as beam
import pytest
from apache_beam import Create
from apache_beam.pvalue import AsList
from apache_beam.testing.test_pipeline import TestPipeline

def test_pipeline(self):
    with TestPipeline() as p:
        ids = [
            {
                'ID': 1,
                'total': 10
            },
            {
                'ID': 2,
                'total': 15
            },
            {
                'ID': 3,
                'total': 5
            }
        ]

        items = [
            {
                'table1_id': 1,
                'item_name': 'a',
                'item_price': 2
            },
            {
                'table1_id': 1,
                'item_name': 'b',
                'item_price': 8
            },
            {
                'table1_id': 2,
                'item_name': 'c',
                'item_price': 5
            },
            {
                'table1_id': 2,
                'item_name': 'd',
                'item_price': 5
            },
            {
                'table1_id': 2,
                'item_name': 'e',
                'item_price': 5
            },
            {
                'table1_id': 3,
                'item_name': 'f',
                'item_price': 7
            }
        ]

        ids_side_inputs = p | 'Side input IDs' >> Create(ids)

        result = (p
                    | 'Input items' >> Create(items)
                    | beam.GroupBy(lambda i: i['table1_id'])
                    | beam.Map(self.to_item_tuple_with_total, ids=AsList(ids_side_inputs))
                    | beam.Map(self.to_item_result)
                 )

        result | "Print outputs" >> beam.Map(print)

def to_item_tuple_with_total(self, item_tuple: Tuple[int, Any], ids: List[Dict]) -> Tuple[Dict, List[Dict]]:
    table_id = item_tuple[0]
    total = next(id_element for id_element in ids if id_element['ID'] == table_id)['total']

    return {'id': table_id, 'total': total}, item_tuple[1]

def to_item_result(self, item_tuple: Tuple[Dict, Any]) -> Dict:
    item_key = item_tuple[0]
    return {'id': item_key['id'], 'total': item_key['total'], 'item': item_tuple[1]}

The result is :

{
  'id': 1, 
  'total': 10, 
  'item': 
  [
     {'table1_id': 1, 'item_name': 'a', 'item_price': 2},
     {'table1_id': 1, 'item_name': 'b', 'item_price': 8}
  ]
}
{
  'id': 2, 
  'total': 15, 
  'item': 
  [
     {'table1_id': 2, 'item_name': 'c', 'item_price': 5},
     {'table1_id': 2, 'item_name': 'd', 'item_price': 5},
     {'table1_id': 2, 'item_name': 'e', 'item_price': 5}
  ]
}
{
  'id': 3, 
  'total': 5, 
  'item': 
   [
      {'table1_id': 3, 'item_name': 'f', 'item_price': 7}
   ]
}

Some explanations :

  • I simulated the items input PCollection from BigQuery
  • I sumulated the ids side input PCollection from BigQuery
  • I added a GroupBy on table1_id from item PCollection
  • I added a Map with side input list IDs to link total to items
  • The last Map returns a Dict with expected fields before to save the result to BigQuery

Upvotes: 1

Related Questions