Paolo P.
Paolo P.

Reputation: 31

DBT: adjust snapshot to handle late arriving data

I'm new to dbt and I am try to solve a "late arriving data" situation for a snapshot model; I was wondering if any of you could help me tackle effectively this issue;

The situation is the following, every once in a while, i read "in full" some data from a source table, which looks like the following:

SOURCE TABLE AT T0

id value id_queue
A 10 1

SOURCE TABLE AT T1

id value id_queue
A 25 4

Changes are recorded into a snapshot table:

{% snapshot mysnapshottable %}

{{
 config(
 target_schema=env_var('MY_SCHEMA'),
 strategy='check',
 unique_key='ID',
 check_cols=['VALUE', 'ID_QUEUE'],
 )
}} 
select * from {{ ref('source_table') }}

{% endsnapshot %}

Now, the problem is this: ID_QUEUE is a MONOTONICALLY INCREASING number that is used to keep things ordered in the case of problem with the source table writer, for instance, if a piece of data is updated 2 times, it may happens that the first change (the less recent one) is recorded into the source AFTER the most recent one. For example, after the two insertions written above, we have the following situation:

id value id_queue dbt_valid_from dbt_valid_to
A 10 1 T0 T1
A 25 4 T1 NULL

Now, suppose that for any reason the source table contains, at T2, a previous "change" for the row with ID A

SOURCE TABLE AT T2

id value id_queue
A 20 3

The result of the snapshot run will be:

id value id_queue dbt_valid_from dbt_valid_to
A 10 1 T0 T1
A 25 4 T1 T2
A 20 3 T2 NULL

While the desired output should be:

id value id_queue dbt_valid_from dbt_valid_to
A 10 1 T0 T1
A 20 3 T1 T2
A 25 4 T2 NULL

Is there a smart way/trick to achieve such result? Thanks in advance for any suggestion.

Upvotes: 3

Views: 1044

Answers (1)

tconbeer
tconbeer

Reputation: 5815

A snapshot is just like any other table -- you can build a model off of it and do whatever you want with the data.

You didn't tag your rdbms, but assuming you're on Snowflake or pg/similar, I would use the rank() window function on both the id_queue and dbt_valid_from fields, then join the table to itself to rearrange the dates. That looks like this:

with
    ranked as (
        select
            *,
            rank() over (
                partition by id order by id_queue asc, dbt_valid_from asc
            ) as id_rank,
            rank() over (
                partition by id order by dbt_valid_from asc, id_queue asc
            ) as ts_rank
        from {{ ref("mysnapshottable") }}
    )
select a.id, a.value, a.id_queue, b.dbt_valid_from, b.dbt_valid_to
from ranked as a
join ranked as b on a.id = b.id and a.id_rank = b.ts_rank

This assumes that the combination of id_queue and dbt_valid_from will be unique for any id, which you should write a test for to confirm.

Upvotes: 1

Related Questions