Reputation: 21
I am trying to build an incremental merge model using dbt. My requirement is to exclude some columns from being updated when the dbt merge query runs.
I am using the dbt configuration, merge_exclude_columns like below,
{{
config(
tags = ['model_test'],
unique_key = 'id',
materialized = 'incremental',
incremental_strategy = 'merge',
merge_exclude_columns = ['method']
)
}}
The merge query being generated by dbt for this,
merge into target as DBT_INTERNAL_DEST
using source_temp as DBT_INTERNAL_SOURCE
on DBT_INTERNAL_SOURCE.id = DBT_INTERNAL_DEST.id
when matched then update set *
when not matched then insert *
When I run the model using this configuration, I still see the method field being updated in the target table.
Can anyone help me understand what exactly is going wrong over here?
Upvotes: 2
Views: 2953
Reputation: 71
You should look at the macro **/macros/materializations/incremental/strategies.sql and override its behavior as you wish. For example, when I needed to insert only specified fields into a merge statement, I wrote something like this
{% macro oracle__get_incremental_merge_sql(args_dict) %}
{%- set dest_columns = args_dict["dest_columns"] -%}
{%- set temp_relation = args_dict["temp_relation"] -%}
{%- set target_relation = args_dict["target_relation"] -%}
{%- set unique_key = args_dict["unique_key"] -%}
{%- set dest_column_names = dest_columns | map(attribute='name') | list -%}
{%- set dest_cols_csv = get_quoted_column_csv(model, dest_column_names) -%}
{%- set merge_update_columns = config.get('merge_update_columns') -%}
{%- set merge_exclude_columns = config.get('merge_exclude_columns') -%}
{%- set merge_insert_columns = config.get('merge_insert_columns') -%}
{%- set custom_insert = config.get('custom_insert') -%}
{%- set incremental_predicates = args_dict["incremental_predicates"] -%}
{%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%}
{%- set insert_columns = get_merge_update_columns(merge_insert_columns, merge_exclude_columns, dest_columns) -%}
{%- if unique_key -%}
{%- set unique_key_result = oracle_check_and_quote_unique_key_for_incremental_merge(unique_key, incremental_predicates) -%}
{%- set unique_key_list = unique_key_result['unique_key_list'] -%}
{%- set unique_key_merge_predicates = unique_key_result['unique_key_merge_predicates'] -%}
merge into {{ target_relation }} DBT_INTERNAL_DEST
using {{ temp_relation }} DBT_INTERNAL_SOURCE
on ({{ unique_key_merge_predicates | join(' AND ') }})
when matched then
update set
{% for col in update_columns if (col.upper() not in unique_key_list and col not in unique_key_list) -%}
DBT_INTERNAL_DEST.{{ col }} = DBT_INTERNAL_SOURCE.{{ col }}{% if not loop.last %}, {% endif %}
{% endfor -%}
when not matched then
{% if custom_insert -%}
insert(
{% for col in insert_columns -%}
{{ col }}{% if not loop.last %}, {% endif %}
{% endfor -%}
)
values(
{% for col in insert_columns -%}
DBT_INTERNAL_SOURCE.{{ col }}{% if not loop.last %}, {% endif %}
{% endfor -%}
)
{%- else -%}
insert({{ dest_cols_csv }})
values(
{% for col in dest_columns -%}
DBT_INTERNAL_SOURCE.{{ adapter.check_and_quote_identifier(col.name, model.columns) }}{% if not loop.last %}, {% endif %}
{% endfor -%}
)
{% endif -%}
{%- else -%}
insert into {{ target_relation }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ temp_relation }}
)
{%- endif -%}{% endmacro %}
And after that I use my custom parameter in the model configuration:
{{
config(
materialized='incremental',
unique_key='uk',
incremental_strategy='merge',
schema='my_schema',
merge_update_columns = ['col1', 'col2', 'col3'],
merge_insert_columns = ['col1', 'col2', 'col3', 'col4'],
custom_insert = true,
tags=["my_tag"]
)}}
Upvotes: 1
Reputation: 1
I have it working (as designed I assume) using dbt 1.6.5 (make sure you have the latest version and are not overwriting the global macros like get_merge_sql).
However the result is:
merge into target as DBT_INTERNAL_DEST
using source_temp as DBT_INTERNAL_SOURCE
on DBT_INTERNAL_SOURCE.id = DBT_INTERNAL_DEST.id
when matched then update set <col list minus merge_exclude_columns>
when not matched then insert *
This does actually not meet my requirements (perhaps it is ok for you?) and I am looking for a way to exclude them also from the insert branch.
Upvotes: 0