Koichi Sakamaki
Koichi Sakamaki

Reputation: 1

How to dynamically defined BQ schema in Dataflow / Apache Beam

I’m new to Dataflow/Apache beam. Here, I’m trying to set the BQ schema dynamically by passing the resource_name to get_schema method. When I run this locally using DirectRunner, it works fine and I can see the data ingested into BQ table. However, when I save this script as a Dataflow template, and use Cloud Run Functions to load and execute the template via Dataflow API, it results in an error, indicating that the schema is empty--ostensibly because get_schema is invoked before runtime and resource_name is a RuntimeValueProvider. Incidentally, hardcoding the schema over get_schema will make it work.

Any insight or solution to dynamically setting the BQ schema under Dataflow would be greatly appreciated. I'm also curious why it works when running locally but not when it's used as a template.

Apology in advance if I have not provided enough info/code/error log. Happy to provide more upon request.

# Function to determine the schema based on the API object
def get_schema(resource_name):
    logger.info('what is resource name %s',resource_name)
    if resource_name == 'adImages':
        return ad_image_schema
    elif resource_name == 'creatives':
        return creatives_schema
    elif resource_name == 'ads':
        return ads_schema
    elif resource_name == 'igposts':
        return igposts_schema    
    elif resource_name == 'campaigns':
        return campaigns_schema    
    else:
        logger.error(f"META API Object {resource_name} not recognized")


class MyExamplePipelineOptions(PipelineOptions):

    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--input',
            type=str,
            help='Path of the file to read from',
        )
        parser.add_value_provider_argument(
            '--output',
            type=str,
            help='Where to write the increased value props',
        )
        parser.add_value_provider_argument(
            '--resource',
            type=str,
            help='Meta API Resource Name',
        )
        parser.add_value_provider_argument(
            '--project_id',
            type=str,
            help='GCP project ID',
        )
        parser.add_value_provider_argument(
            '--gcp_region',
            type=str,
            help='GCP region',
        )

        parser.add_value_provider_argument(
            '--adaccount_id',
            type=str,
            help='Meta Ad Account ID',
        )


def execute_pipeline(        
    options: PipelineOptions,
    input_gcs_path,
    output_bq_table,
    resource_name,
    account_id
):
    
    with beam.Pipeline(options=options) as p:
        input_data = (
            p
            | 'Read from GCS' >> beam.io.ReadFromText(input_gcs_path)  # Read JSON file as text
            | 'Add Key-Value Pair' >> beam.ParDo(AddKeyValuePairFn(account_id)) 
            | 'Parse JSON' >> beam.Map(parse_json,resource_name)  # Parse each line as JSON
            | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
                output_bq_table, # Destination BigQuery table
                schema=get_schema(resource_name), # Define schema
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,  # Append to table
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,  # Create table if it doesn't exist     
        ))

def run():

    pipe_options = PipelineOptions().view_as(MyExamplePipelineOptions)
    pipe_options.view_as(SetupOptions).save_main_session = True
    #logging.info(f"Pipeline: {pipe_options.pipeline_name}")
    logging.info(f"Output location: {pipe_options.output}")
    logging.info(f"Input location: {pipe_options.input}")
    logging.info(f"Resource: {pipe_options.resource}")
    logging.info(f"Project: {pipe_options.project_id}")
    logging.info(f"Project: {pipe_options.gcp_region}")


    options = PipelineOptions(
    runner='DirectRunner',  # Local execution
    project=pipe_options.project_id,  # Your GCP Project ID
    region=pipe_options.gcp_region # Your GCP region
    )

    execute_pipeline(
        options=pipe_options,
        input_gcs_path=pipe_options.input,
        output_bq_table=pipe_options.output, 
        resource_name=pipe_options.resource, 
        account_id=pipe_options.adaccount_id 
    )

logging.info("FINISHED.")

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

Upvotes: 0

Views: 27

Answers (0)

Related Questions