NBaliga
NBaliga

Reputation: 63

Dynamic model generation based on a metadata table in dbt

I am new to dbt and am looking to explore it to build an incremental load from "source" to "staging" schema. The "source" schema is incremental data from the source on a batch_id and "staging" is the merged/upsert data with one row per source-primary-key. Problem is I have over 40 tables and I don't want to manually create 40 .sql files to generate the model. Can anyone point me in the direction of dynamic model creation? I do have the following code written, but I don't know where it should go so that when I issue a dbt run, it takes the info from the metadata_tbl and creates the model dynamically. My schemas reside in a Snowflake DB. Thanks in advance for all the suggestions.

{# OBTAIN TABLES TO BE PROCESSED FROM THE METADATA TABLE #}
{%- call statement('tgt_tbl_info', fetch_result=True) -%}
    select target_schema,
            target_table_name,
            primary_key,
            stg_schema_name
    from {{ source('admin','smetadata’_tbl) }}
    where source_system = 'TEST_INCR'
    order by target_table_name
{%- endcall -%}

{%- set tgt_tbl_name = load_result('tgt_tbl_info') -%}

{%- set tgt_tbl_name_data = tgt_tbl_name['data']-%}


{# LOOP THROUGH THE TABLES TO INCREMENTALLY LOAD THEM #}
{% for tgt_tbl_name in tgt_tbl_name_data %}
    --select {{tgt_tbl_name[0]}}, {{tgt_tbl_name[1]}}
    {{
    config(
        materialized='incremental',
        unique_key='{{tgt_tbl_name[2]}}'
    )
    }}
    
    {# OBTAIN MAX BATCH ID IN THE SOURCE SCHEMA #}
    with src_max_batch as
    (
        select {{tgt_tbl_name[2]}} as src_id, 
                max(batch_id) as src_max_batch_id
        from {{tgt_tbl_name[0]}}.{{tgt_tbl_name[1]}}
        group by {{tgt_tbl_name[2]}}
    ),

    {# OBTAIN MAX BATCH ID IN THE STAGING SCHEMA #}
    stg_max_batch as
    (
        select {{tgt_tbl_name[2]}} as stg_id, 
                max(batch_id) as stg_max_batch_id
        from {{tgt_tbl_name[3]}}.{{tgt_tbl_name[1]}}
        group by {{tgt_tbl_name[2]}}
    ),

    {# OBTAIN ROWS FROM SOURCE SCHEMA THAT NEED TO BE PROCESSED INTO STAGING SCHEMA #}
    to_process as
    (
        select src_id as process_id,
                src_max_batch_id as process_batch_id
        from src_max_batch
            left outer join stg_max_batch 
                on src_id = stg_id
        where src_max_batch_id > IFNULL(stg_max_batch_id,0)
    ),

    final as
    (
        select *
        from {{tgt_tbl_name[0]}}.{{tgt_tbl_name[1]}}
            inner join to_process
                on {{tgt_tbl_name[2]}} = process_id
                and batch_id = process_batch_id
    )

    select *
    from final

    {% if is_incremental() %}

    --where batch_id > (select max(batch_id) from {{ this }}
        where {{tgt_tbl_name[2]}} in (select id from final)

    {% endif %}

{% endfor %}

Upvotes: 3

Views: 3841

Answers (1)

Sashko Micov
Sashko Micov

Reputation: 51

You should use Materializations that dbt offers, here is an example and link to the dbt documentation:https://docs.getdbt.com/docs/guides/creating-new-materializations

{%- materialization my_view, default -%}

{%- set target_relation = api.Relation.create(
    identifier=identifier, schema=schema, database=database,
    type='view') -%}

-- ... setup database ...
-- ... run pre-hooks...

-- build model
{% call statement('main') -%}
{{ create_view_as(target_relation, sql) }}
{%- endcall %}

-- ... run post-hooks ...
-- ... clean up the database...

-- Return the relations created in this materialization
{{ return({'relations': [target_relation]}) }}

{%- endmaterialization -%}

Upvotes: 0

Related Questions