sakshi
sakshi

Reputation: 21

merge_exclude_columns in dbt incremental merge strategy not working as expected

I am trying to build an incremental merge model using dbt. My requirement is to exclude some columns from being updated when the dbt merge query runs.

I am using the dbt configuration, merge_exclude_columns like below,

{{
        config(
            tags = ['model_test'],
            unique_key = 'id',
            materialized = 'incremental',
            incremental_strategy = 'merge',
            merge_exclude_columns = ['method']
        )
    }}

The merge query being generated by dbt for this,

merge into target as DBT_INTERNAL_DEST
      using source_temp as DBT_INTERNAL_SOURCE
      on DBT_INTERNAL_SOURCE.id = DBT_INTERNAL_DEST.id
      when matched then update set *
      when not matched then insert *

When I run the model using this configuration, I still see the method field being updated in the target table.

Can anyone help me understand what exactly is going wrong over here?

Upvotes: 2

Views: 2953

Answers (2)

Roman S
Roman S

Reputation: 71

You should look at the macro **/macros/materializations/incremental/strategies.sql and override its behavior as you wish. For example, when I needed to insert only specified fields into a merge statement, I wrote something like this

{% macro oracle__get_incremental_merge_sql(args_dict) %}
{%- set dest_columns = args_dict["dest_columns"] -%}
{%- set temp_relation = args_dict["temp_relation"] -%}
{%- set target_relation = args_dict["target_relation"] -%}
{%- set unique_key = args_dict["unique_key"] -%}
{%- set dest_column_names = dest_columns | map(attribute='name') | list -%}
{%- set dest_cols_csv = get_quoted_column_csv(model, dest_column_names)  -%}
{%- set merge_update_columns = config.get('merge_update_columns') -%}
{%- set merge_exclude_columns = config.get('merge_exclude_columns') -%}
{%- set merge_insert_columns = config.get('merge_insert_columns') -%}
{%- set custom_insert = config.get('custom_insert') -%}
{%- set incremental_predicates = args_dict["incremental_predicates"] -%}
{%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%}
{%- set insert_columns = get_merge_update_columns(merge_insert_columns, merge_exclude_columns, dest_columns) -%}
{%- if unique_key -%}
    {%- set unique_key_result = oracle_check_and_quote_unique_key_for_incremental_merge(unique_key, incremental_predicates) -%}
    {%- set unique_key_list = unique_key_result['unique_key_list'] -%}
    {%- set unique_key_merge_predicates = unique_key_result['unique_key_merge_predicates'] -%}
    merge into {{ target_relation }} DBT_INTERNAL_DEST
      using {{ temp_relation }} DBT_INTERNAL_SOURCE
      on ({{ unique_key_merge_predicates | join(' AND ') }})
        when matched then
      update set
      {% for col in update_columns if (col.upper() not in unique_key_list and col not in unique_key_list) -%}
        DBT_INTERNAL_DEST.{{ col }} = DBT_INTERNAL_SOURCE.{{ col }}{% if not loop.last %}, {% endif %}
      {% endfor -%}
    when not matched then
    {% if custom_insert -%}
      insert(
         {% for col in insert_columns -%}
          {{ col }}{% if not loop.last %}, {% endif %}
        {% endfor -%}
      )
      values(
        {% for col in insert_columns -%}
          DBT_INTERNAL_SOURCE.{{ col }}{% if not loop.last %}, {% endif %}
        {% endfor -%}
      )
    {%- else -%}
      insert({{ dest_cols_csv }})
      values(
        {% for col in dest_columns -%}
          DBT_INTERNAL_SOURCE.{{ adapter.check_and_quote_identifier(col.name, model.columns) }}{% if not loop.last %}, {% endif %}
        {% endfor -%}
      )
    {% endif -%}
{%- else -%}
insert into {{ target_relation }} ({{ dest_cols_csv }})
(
   select {{ dest_cols_csv }}
   from {{ temp_relation }}
)
{%- endif -%}{% endmacro %}

And after that I use my custom parameter in the model configuration:

{{
config(
    materialized='incremental',
    unique_key='uk',
    incremental_strategy='merge',
    schema='my_schema',
    merge_update_columns = ['col1', 'col2', 'col3'],
    merge_insert_columns = ['col1', 'col2', 'col3', 'col4'],
    custom_insert = true,
    tags=["my_tag"]
)}}

Upvotes: 1

inovitae
inovitae

Reputation: 1

I have it working (as designed I assume) using dbt 1.6.5 (make sure you have the latest version and are not overwriting the global macros like get_merge_sql).

However the result is:

    merge into target as DBT_INTERNAL_DEST
    using source_temp as DBT_INTERNAL_SOURCE
    on DBT_INTERNAL_SOURCE.id = DBT_INTERNAL_DEST.id
    when matched then update set <col list minus merge_exclude_columns>
    when not matched then insert *

This does actually not meet my requirements (perhaps it is ok for you?) and I am looking for a way to exclude them also from the insert branch.

Upvotes: 0

Related Questions