Reputation: 444
I'm building a dbt pipeline with PostgreSQL. One of my initial tables contains time series data with columns 'group_id', 'timestamp', and various numeric columns.
I'd like to create a downstream model that fills missing numeric values in the time series data with the previous (by 'group_id') row's value or, if that doesn't exist, with the average value across the whole table. I need to perform this sort of cleanup for six columns.
I've written some code that does this correctly, but my initial table has about 1.2 million records, and running dbt run -s "my-model"
takes forever. I killed it after an hour last time.
The code is below. Can you think of a more performant way to fill in these missing values? I tried to think of ways to help with an intermediate dbt model but have come up empty so far.
with avg_stats as (select avg(stat_1) as avg_stat_1,
avg(stat_2) as avg_stat_2,
...
from my_db.my_table),
select stats.group_id,
stats.timestamp,
coalesce(
stats.stat_1,
coalesce(
(select previous.stat_1
from my_db.my_table previous
where
previous.group_id = stats.group_id and
previous.stat_1 is not null and
previous.timestamp < stats.timestamp
order by previous.timestamp desc
limit 1),
avg_stat_1)
) as stat_1,
coalesce(
stats.stat_2,
coalesce(
(select previous.stat_2
from my_db.my_table previous
where
previous.group_id = stats.group_id and
previous.stat_2 is not null and
previous.timestamp < stats.timestamp
order by previous.timestamp desc
limit 1),
avg_stat_2)
) as stat_2,
...
from my_db.my_table stats;
Upvotes: 0
Views: 144
Reputation: 444
Found a method that works quite nicely.
with avg_stats as (select avg(stat_1) as avg_stat_1,
avg(stat_2) as avg_stat_2,
...
from my_db.my_table),
partitioned as (select group_id,
timestamp,
stat_1,
sum(case when stat_1 is not null then 1 else 0 end)
over (order by group_id, timestamp) as stat_1_partition_id,
stat_2,
sum(case when stat_2 is not null then 1 else 0 end)
over (order by group_id, timestamp) as stat_2_partition_id,
from my_db.my_table)
select group_id,
timestamp,
coalesce(
first_value(stat_1)
over (partition by stat_1_partition_id order by group_id),
avg_stat_1
) as stat_1,
coalesce(
first_value(stat_2)
over (partition by stat_2_partition_id order by group_id),
avg_stat_2
) as stat_2,
...
from partitioned;
When using this type of query as part of my dbt model with 1.2 million records and table materialization, dbt run -s my_model
completed in 2.5 minutes. Good enough for me.
Upvotes: 0