Reputation: 63
I am new to dbt and am looking to explore it to build an incremental load from "source" to "staging" schema. The "source" schema is incremental data from the source on a batch_id and "staging" is the merged/upsert data with one row per source-primary-key. Problem is I have over 40 tables and I don't want to manually create 40 .sql files to generate the model. Can anyone point me in the direction of dynamic model creation? I do have the following code written, but I don't know where it should go so that when I issue a dbt run, it takes the info from the metadata_tbl and creates the model dynamically. My schemas reside in a Snowflake DB. Thanks in advance for all the suggestions.
{# OBTAIN TABLES TO BE PROCESSED FROM THE METADATA TABLE #}
{%- call statement('tgt_tbl_info', fetch_result=True) -%}
select target_schema,
target_table_name,
primary_key,
stg_schema_name
from {{ source('admin','smetadata’_tbl) }}
where source_system = 'TEST_INCR'
order by target_table_name
{%- endcall -%}
{%- set tgt_tbl_name = load_result('tgt_tbl_info') -%}
{%- set tgt_tbl_name_data = tgt_tbl_name['data']-%}
{# LOOP THROUGH THE TABLES TO INCREMENTALLY LOAD THEM #}
{% for tgt_tbl_name in tgt_tbl_name_data %}
--select {{tgt_tbl_name[0]}}, {{tgt_tbl_name[1]}}
{{
config(
materialized='incremental',
unique_key='{{tgt_tbl_name[2]}}'
)
}}
{# OBTAIN MAX BATCH ID IN THE SOURCE SCHEMA #}
with src_max_batch as
(
select {{tgt_tbl_name[2]}} as src_id,
max(batch_id) as src_max_batch_id
from {{tgt_tbl_name[0]}}.{{tgt_tbl_name[1]}}
group by {{tgt_tbl_name[2]}}
),
{# OBTAIN MAX BATCH ID IN THE STAGING SCHEMA #}
stg_max_batch as
(
select {{tgt_tbl_name[2]}} as stg_id,
max(batch_id) as stg_max_batch_id
from {{tgt_tbl_name[3]}}.{{tgt_tbl_name[1]}}
group by {{tgt_tbl_name[2]}}
),
{# OBTAIN ROWS FROM SOURCE SCHEMA THAT NEED TO BE PROCESSED INTO STAGING SCHEMA #}
to_process as
(
select src_id as process_id,
src_max_batch_id as process_batch_id
from src_max_batch
left outer join stg_max_batch
on src_id = stg_id
where src_max_batch_id > IFNULL(stg_max_batch_id,0)
),
final as
(
select *
from {{tgt_tbl_name[0]}}.{{tgt_tbl_name[1]}}
inner join to_process
on {{tgt_tbl_name[2]}} = process_id
and batch_id = process_batch_id
)
select *
from final
{% if is_incremental() %}
--where batch_id > (select max(batch_id) from {{ this }}
where {{tgt_tbl_name[2]}} in (select id from final)
{% endif %}
{% endfor %}
Upvotes: 3
Views: 3841
Reputation: 51
You should use Materializations that dbt offers, here is an example and link to the dbt documentation:https://docs.getdbt.com/docs/guides/creating-new-materializations
{%- materialization my_view, default -%}
{%- set target_relation = api.Relation.create(
identifier=identifier, schema=schema, database=database,
type='view') -%}
-- ... setup database ...
-- ... run pre-hooks...
-- build model
{% call statement('main') -%}
{{ create_view_as(target_relation, sql) }}
{%- endcall %}
-- ... run post-hooks ...
-- ... clean up the database...
-- Return the relations created in this materialization
{{ return({'relations': [target_relation]}) }}
{%- endmaterialization -%}
Upvotes: 0