Reputation: 11
Writing a pipeline that splits a stream into tables dynamically named by the event_name and event_date in the data, in Dataflow.
The tables are being created, with the correct name, but the data is failing to be written, citing the below formatting error.
"Unknown name "json" at 'rows[0]': Proto field is not repeating, cannot start list"
The print record stage provides this log, immediately before writetobigquery is called - which to me looks correct:
About to write to BigQuery - Table: PROJECT_ID:DATASET_NAME.TABLE_NAME, Record: [{'event_name': 'scroll', 'event_date': '20241118', 'user_id': '', 'platform': 'WEB'}]
(for clarity i have tried removing square brackets, with the same result)
Here is the pipeline code
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.transforms.window import FixedWindows
import logging
def log_before_write(element):
table_name, record = element
logging.info(f"About to write to BigQuery - Table: {table_name}, Record: {record}")
return element
class SplitByParameter(beam.DoFn):
def process(self, element):
event_name = element['event_name']
event_date = element['event_date']
yield (event_name, event_date, element)
def format_table_name(element):
event_name, event_date, record = element
sanitized_event_name = event_name.replace(' ', '_')
sanitized_event_date = event_date.replace(' ', '_')
table_name = f'PROJECT_ID:DATASET.{sanitized_event_name}_{sanitized_event_date}'
return table_name, record
def split_records(element):
table_name, record = element
json_record = [{
'event_name': str(record.get('event_name', '')) if record.get('event_name') is not None else '',
'event_date': str(record.get('event_date', '')) if record.get('event_date') is not None else '',
'user_id': str(record.get('user_id', '')) if record.get('user_id') is not None else '',
'platform': str(record.get('platform', '')) if record.get('platform') is not None else ''
}]
yield (table_name,json_record)
def print_record(record):
logging.info(f"Record before WriteToBigQuery: {record}")
return record
def run(argv=None):
options = PipelineOptions(argv)
options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=options)
# Define schema for BigQuery (this needs to match your record structure)
schema = 'event_name:STRING, event_date:STRING, user_id:STRING, platform:STRING'
# Read from BigQuery, apply windowing, and process records
(p
| 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(query=f'''
SELECT *
FROM `PROJECT_ID.DATASET.TABLE`
WHERE _TABLE_SUFFIX = FORMAT_TIMESTAMP('%Y%m%d', CURRENT_TIMESTAMP())
''', use_standard_sql=True)
| 'ApplyWindowing' >> beam.WindowInto(FixedWindows(60)) # 60-second window
| 'SplitByParameter' >> beam.ParDo(SplitByParameter()) # Split by event_name and event_date
| 'FormatTableName' >> beam.Map(format_table_name) # Format the table name
| 'LogBeforeFlatMap' >> beam.Map(lambda x: logging.info(f'Before FlatMap: {x}') or x)
| 'SplitRecords' >> beam.FlatMap(split_records) # Convert record to desired format
| 'LogBeforeWrite' >> beam.Map(log_before_write)
| 'PrintRecord' >> beam.Map(print_record) # Print records before writing to BigQuery
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
table=lambda x: x[0], # Table name is the first element of the tuple
schema=schema, # Use the schema defined above
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND # Append data to existing tables
)
)
p.run()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Not sure where to go from here...
I've tried a bunch of different formatting, but there's only so many configs that have a chance of working.
Upvotes: 0
Views: 34
Reputation: 1
WriteToBigQuery expects to process json fields, but in this pipeline it is receiving a tuple of (table_name, json_field).
The PCollection parsed by WriteToBigQuery needs to be a json field, and the table name function should genererate the table name using the json field as an input like
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import logging
def format_table_name(element):
event_name = element['event_name']
event_date = element['event_date']
sanitized_event_name = event_name.replace(' ', '_')
sanitized_event_date = event_date.replace(' ', '_')
table_name = f'PROJECT_ID:DATASET.{sanitized_event_name}_{sanitized_event_date}'
return table_name
def split_records(element):
json_record = {
'event_name': str(element.get('event_name', '')) if element.get('event_name') is not None else '',
'event_date': str(element.get('event_date', '')) if element.get('event_date') is not None else '',
'user_id': str(element.get('user_id', '')) if element.get('user_id') is not None else '',
'platform': str(element.get('platform', '')) if element.get('platform') is not None else ''
}
yield json_record
def run(argv=None):
options = PipelineOptions(argv)
options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=options)
# Define schema for BigQuery (this needs to match your record structure)
schema = 'event_name:STRING, event_date:STRING, user_id:STRING, platform:STRING'
(p
| beam.Create([
{
'event_name': 'scroll',
'event_date': '20241118',
'user_id': 'user1',
'platform': 'WEB'
},
{
'event_name': 'click',
'event_date': '20241117',
'user_id': 'user2',
'platform': 'WEB'
}
])
| 'SplitRecords' >> beam.FlatMap(split_records) # Convert record to desired format
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
table=format_table_name, # Table name is the first element of the tuple
schema=schema, # Use the schema defined above
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND # Append data to existing tables
)
)
p.run()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Upvotes: 0