Rick Rickles
Rick Rickles

Reputation: 11

Apache_beam write to big query json payload error

Writing a pipeline that splits a stream into tables dynamically named by the event_name and event_date in the data, in Dataflow.

The tables are being created, with the correct name, but the data is failing to be written, citing the below formatting error.

"Unknown name "json" at 'rows[0]': Proto field is not repeating, cannot start list"

The print record stage provides this log, immediately before writetobigquery is called - which to me looks correct:

About to write to BigQuery - Table: PROJECT_ID:DATASET_NAME.TABLE_NAME, Record: [{'event_name': 'scroll', 'event_date': '20241118', 'user_id': '', 'platform': 'WEB'}]

(for clarity i have tried removing square brackets, with the same result)

Here is the pipeline code

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.transforms.window import FixedWindows
import logging

def log_before_write(element):
    table_name, record = element
    logging.info(f"About to write to BigQuery - Table: {table_name}, Record: {record}")
    return element 

class SplitByParameter(beam.DoFn):
    def process(self, element):
        event_name = element['event_name']
        event_date = element['event_date']
        yield (event_name, event_date, element)

def format_table_name(element):
    event_name, event_date, record = element
    sanitized_event_name = event_name.replace(' ', '_')
    sanitized_event_date = event_date.replace(' ', '_')
    table_name = f'PROJECT_ID:DATASET.{sanitized_event_name}_{sanitized_event_date}'
    return table_name, record


def split_records(element):
    table_name, record = element

    
    json_record = [{
    'event_name': str(record.get('event_name', '')) if record.get('event_name') is not None else '',
    'event_date': str(record.get('event_date', '')) if record.get('event_date') is not None else '',
    'user_id': str(record.get('user_id', '')) if record.get('user_id') is not None else '',
    'platform': str(record.get('platform', '')) if record.get('platform') is not None else ''
    }]

    yield (table_name,json_record)

def print_record(record):
    logging.info(f"Record before WriteToBigQuery: {record}")
    return record

def run(argv=None):
    options = PipelineOptions(argv)
    options.view_as(StandardOptions).streaming = True
    p = beam.Pipeline(options=options)

    # Define schema for BigQuery (this needs to match your record structure)
    schema = 'event_name:STRING, event_date:STRING, user_id:STRING, platform:STRING'

    # Read from BigQuery, apply windowing, and process records
    (p
     | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(query=f'''
        SELECT *
        FROM `PROJECT_ID.DATASET.TABLE`
        WHERE _TABLE_SUFFIX = FORMAT_TIMESTAMP('%Y%m%d', CURRENT_TIMESTAMP())
       ''', use_standard_sql=True)
     | 'ApplyWindowing' >> beam.WindowInto(FixedWindows(60))  # 60-second window
     | 'SplitByParameter' >> beam.ParDo(SplitByParameter())  # Split by event_name and event_date
     | 'FormatTableName' >> beam.Map(format_table_name)  # Format the table name
     | 'LogBeforeFlatMap' >> beam.Map(lambda x: logging.info(f'Before FlatMap: {x}') or x)
     | 'SplitRecords' >> beam.FlatMap(split_records)  # Convert record to desired format
     | 'LogBeforeWrite' >> beam.Map(log_before_write)
     | 'PrintRecord' >> beam.Map(print_record)  # Print records before writing to BigQuery
     | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
           table=lambda x: x[0],  # Table name is the first element of the tuple
           schema=schema,  # Use the schema defined above
           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND  # Append data to existing tables
       )
    )

    p.run()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

Not sure where to go from here...

I've tried a bunch of different formatting, but there's only so many configs that have a chance of working.

Upvotes: 0

Views: 34

Answers (1)

WriteToBigQuery expects to process json fields, but in this pipeline it is receiving a tuple of (table_name, json_field).

The PCollection parsed by WriteToBigQuery needs to be a json field, and the table name function should genererate the table name using the json field as an input like

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import logging

def format_table_name(element):
    event_name = element['event_name']
    event_date = element['event_date']
    sanitized_event_name = event_name.replace(' ', '_')
    sanitized_event_date = event_date.replace(' ', '_')
    table_name = f'PROJECT_ID:DATASET.{sanitized_event_name}_{sanitized_event_date}'
    return table_name


def split_records(element):
    json_record = {
    'event_name': str(element.get('event_name', '')) if element.get('event_name') is not None else '',
    'event_date': str(element.get('event_date', '')) if element.get('event_date') is not None else '',
    'user_id': str(element.get('user_id', '')) if element.get('user_id') is not None else '',
    'platform': str(element.get('platform', '')) if element.get('platform') is not None else ''
    }

    yield json_record

def run(argv=None):
    options = PipelineOptions(argv)
    options.view_as(StandardOptions).streaming = True
    p = beam.Pipeline(options=options)

    # Define schema for BigQuery (this needs to match your record structure)
    schema = 'event_name:STRING, event_date:STRING, user_id:STRING, platform:STRING'

    (p
     | beam.Create([
         {
            'event_name': 'scroll',
            'event_date': '20241118',
            'user_id': 'user1',
            'platform': 'WEB'
        },
        {
            'event_name': 'click',
            'event_date': '20241117',
            'user_id': 'user2',
            'platform': 'WEB'
        }
     ])
     | 'SplitRecords' >> beam.FlatMap(split_records)  # Convert record to desired format
     | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
           table=format_table_name,  # Table name is the first element of the tuple
           schema=schema,  # Use the schema defined above
           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND  # Append data to existing tables
       )
    )

    p.run()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

Upvotes: 0

Related Questions