Reputation: 761
I am new to DBT and have previously been using Airflow for data transformations.
In Airflow there is a variable called {{ ds }}
which represents the logical date in this form YYYY-MM-DD
and {{ ds_nodash }}
which represents the logical date in this form YYYYMMDD
. I can then set up a task similar this:
my_task = BigQueryOperator(
task_id='t_my_task',
sql= """ SELECT * FROM my_table where my_date="{{ ds }}" """,
destination_dataset_table='my_project.my_dataset.my_table_new${{ ds_nodash }}',
write_disposition='WRITE_TRUNCATE',
dag=dag
)
This means that I am running the SQL query given on the third line and this will overwrite the table on the fourth line. In the Airflow interface, if I rerun say just the day "2022-01-11" then it will automatically overwrite that partition on that date.
I am trying to figure out how to do the same in DBT.
Upvotes: 3
Views: 11608
Reputation: 2160
To achieve this behavior you need to define the following three configurations in your dbt
model:
day
granularityinser_overwrite
strategy (that overwritten whole partitions)is_incremental()
See a slightly adapted example take from here below:
{{
config(
materialized = 'incremental',
incremental_strategy = 'insert_overwrite',
partition_by = {
'field': 'session_start',
'data_type': 'timestamp'
'granularity': 'day'
}
)
}}
with events as (
select * from {{ref('events')}}
{% if is_incremental() %}
-- recalculate the latest day's data + previous
where date(event_timestamp) >= date_sub(date(_dbt_max_partition), interval 1 day)
{% endif %}
),
... rest of model ...
Upvotes: 2
Reputation: 1978
With DBT you can do that by using an incremental model.
In dbt, you describe the data you want with SQL statements, and the materialization you choose dictates the way it's going to be materialized in your warehouse:
With incremental models you basically have a big table where you want to insert new rows. This materialization lets you add rules such as "insert rows from table source_table
where timestamp >= today".
In your case, with DBT + BigQuery, you have 2 options: merge
or insert+ overwrite
, but from your description you'll want to use the latter.
You'll need to include this at the beginning of your model:
{{
config(
materialized='incremental',
incremental_strategy='insert_overwrite',
...
)
}}
For reference you can go there and there.
Upvotes: 3