Reputation: 1
I’m new to Dataflow/Apache beam. Here, I’m trying to set the BQ schema dynamically by passing the resource_name to get_schema method. When I run this locally using DirectRunner, it works fine and I can see the data ingested into BQ table. However, when I save this script as a Dataflow template, and use Cloud Run Functions to load and execute the template via Dataflow API, it results in an error, indicating that the schema is empty--ostensibly because get_schema is invoked before runtime and resource_name is a RuntimeValueProvider. Incidentally, hardcoding the schema over get_schema will make it work.
Any insight or solution to dynamically setting the BQ schema under Dataflow would be greatly appreciated. I'm also curious why it works when running locally but not when it's used as a template.
Apology in advance if I have not provided enough info/code/error log. Happy to provide more upon request.
# Function to determine the schema based on the API object
def get_schema(resource_name):
logger.info('what is resource name %s',resource_name)
if resource_name == 'adImages':
return ad_image_schema
elif resource_name == 'creatives':
return creatives_schema
elif resource_name == 'ads':
return ads_schema
elif resource_name == 'igposts':
return igposts_schema
elif resource_name == 'campaigns':
return campaigns_schema
else:
logger.error(f"META API Object {resource_name} not recognized")
class MyExamplePipelineOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input',
type=str,
help='Path of the file to read from',
)
parser.add_value_provider_argument(
'--output',
type=str,
help='Where to write the increased value props',
)
parser.add_value_provider_argument(
'--resource',
type=str,
help='Meta API Resource Name',
)
parser.add_value_provider_argument(
'--project_id',
type=str,
help='GCP project ID',
)
parser.add_value_provider_argument(
'--gcp_region',
type=str,
help='GCP region',
)
parser.add_value_provider_argument(
'--adaccount_id',
type=str,
help='Meta Ad Account ID',
)
def execute_pipeline(
options: PipelineOptions,
input_gcs_path,
output_bq_table,
resource_name,
account_id
):
with beam.Pipeline(options=options) as p:
input_data = (
p
| 'Read from GCS' >> beam.io.ReadFromText(input_gcs_path) # Read JSON file as text
| 'Add Key-Value Pair' >> beam.ParDo(AddKeyValuePairFn(account_id))
| 'Parse JSON' >> beam.Map(parse_json,resource_name) # Parse each line as JSON
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
output_bq_table, # Destination BigQuery table
schema=get_schema(resource_name), # Define schema
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, # Append to table
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, # Create table if it doesn't exist
))
def run():
pipe_options = PipelineOptions().view_as(MyExamplePipelineOptions)
pipe_options.view_as(SetupOptions).save_main_session = True
#logging.info(f"Pipeline: {pipe_options.pipeline_name}")
logging.info(f"Output location: {pipe_options.output}")
logging.info(f"Input location: {pipe_options.input}")
logging.info(f"Resource: {pipe_options.resource}")
logging.info(f"Project: {pipe_options.project_id}")
logging.info(f"Project: {pipe_options.gcp_region}")
options = PipelineOptions(
runner='DirectRunner', # Local execution
project=pipe_options.project_id, # Your GCP Project ID
region=pipe_options.gcp_region # Your GCP region
)
execute_pipeline(
options=pipe_options,
input_gcs_path=pipe_options.input,
output_bq_table=pipe_options.output,
resource_name=pipe_options.resource,
account_id=pipe_options.adaccount_id
)
logging.info("FINISHED.")
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Upvotes: 0
Views: 27