Reputation: 349
I'm need to design an Airflow pipeline to load CSV's into BigQuery.
I know the CSV's frequently have a changing schema. After loading the first file the schema might be
id | ps_1 | ps_1_value
when the second file lands and I load it it might look like
id | ps_1 | ps_1_value | ps_1 | ps_2_value
.
What's the best approach to handling this?
My first thought on approaching this would be
I would do this in a PythonOperator.
If file 3 comes in and looks like id | ps_2 | ps_2_value
I would fill in the missing columns and do the insert.
Thanks for the feedback.
Upvotes: 3
Views: 5895
Reputation: 349
After loading two prior files example_data_1.csv
and example_data_2.csv
I can see that the fields are being inserted into the correct columns, with new columns being added as needed.
Edit: The light bulb moment was realizing that the schema_update_options
exist. See here: https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.job.SchemaUpdateOption.html
csv_to_bigquery = GoogleCloudStorageToBigQueryOperator(
task_id='csv_to_bigquery',
google_cloud_storage_conn_id='google_cloud_default',
bucket=airflow_bucket,
source_objects=['data/example_data_3.csv'],
skip_leading_rows=1,
bigquery_conn_id='google_cloud_default',
destination_project_dataset_table='{}.{}.{}'.format(project, schema, table),
source_format='CSV',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND',
schema_update_options=['ALLOW_FIELD_RELAXATION', 'ALLOW_FIELD_ADDITION'],
autodetect=True,
dag=dag
)
Upvotes: 6
Reputation: 4085
Basically, the recommended pipeline for your case consists in creating a temporary table for treating your new data.
Since AirFlow
is an orchestration tool, its not recommended to create big flows of data through it.
Given that, your DAG
could be very similar to your current DAG
:
schema_update_options
. Besides that, if your actual table has fields in NULLABLE mode, it will be able to easily deal with missing columns case your new data have some missing field.GCS
, move your file to another bucket or directory.Finally, I would like to point some links that might be useful to you:
I hope it helps
Upvotes: 1