JW2
JW2

Reputation: 349

Airflow Pipeline CSV to BigQuery with Schema Changes

Background

I'm need to design an Airflow pipeline to load CSV's into BigQuery.

I know the CSV's frequently have a changing schema. After loading the first file the schema might be

id | ps_1 | ps_1_value

when the second file lands and I load it it might look like

id | ps_1 | ps_1_value | ps_1 | ps_2_value.

Question

What's the best approach to handling this?


My first thought on approaching this would be

  1. Load the second file
  2. Compare the schema against the current table
  3. Update the table, adding two columns (ps_2, ps_2_value)
  4. Insert the new rows

I would do this in a PythonOperator.

If file 3 comes in and looks like id | ps_2 | ps_2_value I would fill in the missing columns and do the insert.

Thanks for the feedback.

Upvotes: 3

Views: 5895

Answers (2)

JW2
JW2

Reputation: 349

After loading two prior files example_data_1.csv and example_data_2.csv I can see that the fields are being inserted into the correct columns, with new columns being added as needed.

Edit: The light bulb moment was realizing that the schema_update_options exist. See here: https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.job.SchemaUpdateOption.html

csv_to_bigquery = GoogleCloudStorageToBigQueryOperator(
    task_id='csv_to_bigquery',
    google_cloud_storage_conn_id='google_cloud_default',
    bucket=airflow_bucket,
    source_objects=['data/example_data_3.csv'],
    skip_leading_rows=1,
    bigquery_conn_id='google_cloud_default',    
    destination_project_dataset_table='{}.{}.{}'.format(project, schema, table),
    source_format='CSV',
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_APPEND',
    schema_update_options=['ALLOW_FIELD_RELAXATION', 'ALLOW_FIELD_ADDITION'],
    autodetect=True,
    dag=dag
)

enter image description here

Upvotes: 6

rmesteves
rmesteves

Reputation: 4085

Basically, the recommended pipeline for your case consists in creating a temporary table for treating your new data. Since AirFlow is an orchestration tool, its not recommended to create big flows of data through it.

Given that, your DAG could be very similar to your current DAG:

  1. Load the new file to a temporary table
  2. Compare the actual table's schema and the temporary table's schema.
  3. Run a query to move the data from the temporary table to the actual table. If the temporary table has new fields, add them to the actual table using the parameter schema_update_options. Besides that, if your actual table has fields in NULLABLE mode, it will be able to easily deal with missing columns case your new data have some missing field.
  4. Delete your temporary table
  5. If you're using GCS, move your file to another bucket or directory.

Finally, I would like to point some links that might be useful to you:

  1. AirFlow Documentation (BigQuery's operators)
  2. An article which shows a problem similar to yours ans where you can find some of the mentioned informations.

I hope it helps

Upvotes: 1

Related Questions