Reputation: 460
I try to rename bigquery rows in an Apache Beam Pipeline in Python like in the following example : Having 1 PCollection with the full data and 1 other with only 3 fields renamed col1 in col1.2, col2 in col2.2...
How can I apply my filter correctly to get that second PCollection with renamed rows ?
def is_filtered(row):
row['col1'] == row['col1.2']
row['col2'] == row['col2.2']
row['col3'] == row['col3.2']
yield row
with beam.Pipeline() as pipeline:
query = open('query.sql', 'r')
bq_source = beam.io.BigQuerySource(query=query.read(),
use_standard_sql=True)
main_table = \
pipeline \
| 'ReadBQData' >> beam.io.Read(bq_source) \
cycle_table = (
pipeline
| 'FilterMainTable' >> beam.Filter(is_filtered, main_table))
I also thaught about using Partition but the Partition examples I found were more about partitioning the content of the rows and not the row itself
Upvotes: 1
Views: 512
Reputation: 478
The Filter operator is used to create a PCollection with rows removed from the source (it is expected to return a boolean). Use the Map operator if you want to create a PCollection with rows transformed 1:1. Here is an example:
def filter_columns(row):
return {'col1.2': row['col1'],
'col2.2': row['col2'],
'col3.2': row['col3']}
with beam.Pipeline() as pipeline:
query = open('query.sql', 'r')
bq_source = beam.io.BigQuerySource(query=query.read(),
use_standard_sql=True)
main_table = \
pipeline \
| 'ReadBQData' >> beam.io.Read(bq_source)
cycle_table = (
main_table
| 'FilterMainTable' >> beam.Map(filter_columns))
Upvotes: 3