Keegan Kozler
Keegan Kozler

Reputation: 444

Optimizing dbt Pipeline Performance for Time Series Data Imputation

I'm building a dbt pipeline with PostgreSQL. One of my initial tables contains time series data with columns 'group_id', 'timestamp', and various numeric columns.

I'd like to create a downstream model that fills missing numeric values in the time series data with the previous (by 'group_id') row's value or, if that doesn't exist, with the average value across the whole table. I need to perform this sort of cleanup for six columns.

I've written some code that does this correctly, but my initial table has about 1.2 million records, and running dbt run -s "my-model" takes forever. I killed it after an hour last time.

The code is below. Can you think of a more performant way to fill in these missing values? I tried to think of ways to help with an intermediate dbt model but have come up empty so far.

with avg_stats as (select avg(stat_1) as avg_stat_1,
                          avg(stat_2) as avg_stat_2,
                          ...
                    from my_db.my_table),

select stats.group_id,
       stats.timestamp,
       coalesce(
               stats.stat_1,
               coalesce(
                       (select previous.stat_1
                                from my_db.my_table previous
                                where
                                    previous.group_id = stats.group_id and
                                    previous.stat_1 is not null and
                                    previous.timestamp < stats.timestamp
                                order by previous.timestamp desc
                                limit 1),
                       avg_stat_1)
       ) as stat_1,
       coalesce(
               stats.stat_2,
               coalesce(
                       (select previous.stat_2
                                from my_db.my_table previous
                                where
                                    previous.group_id = stats.group_id and
                                    previous.stat_2 is not null and
                                    previous.timestamp < stats.timestamp
                                order by previous.timestamp desc
                                limit 1),
                       avg_stat_2)
       ) as stat_2,
       ...
from my_db.my_table stats;

Upvotes: 0

Views: 144

Answers (1)

Keegan Kozler
Keegan Kozler

Reputation: 444

Found a method that works quite nicely.

with avg_stats as (select avg(stat_1) as avg_stat_1,
                          avg(stat_2) as avg_stat_2,
                          ...
                    from my_db.my_table),

     partitioned as (select group_id,
                            timestamp,
                            stat_1,
                            sum(case when stat_1 is not null then 1 else 0 end)
                            over (order by group_id, timestamp) as stat_1_partition_id,
                            stat_2,
                            sum(case when stat_2 is not null then 1 else 0 end)
                            over (order by group_id, timestamp) as stat_2_partition_id,
                    from my_db.my_table)

select group_id,
       timestamp,
       coalesce(
               first_value(stat_1)
               over (partition by stat_1_partition_id order by group_id),
               avg_stat_1
       ) as stat_1,
       coalesce(
               first_value(stat_2)
               over (partition by stat_2_partition_id order by group_id),
               avg_stat_2
       ) as stat_2,
       ...
from partitioned;

When using this type of query as part of my dbt model with 1.2 million records and table materialization, dbt run -s my_model completed in 2.5 minutes. Good enough for me.

Upvotes: 0

Related Questions