hockeyman
hockeyman

Reputation: 1183

Time intervals analysis in BigQuery

Introduction

I have IoT devices that are constantly sending data to the server. The data consists of those fields:

There is no guarantee that data will be received in correct time order, meaning that sometimes it might send data captured in the past, that removes the option to analyze and enrich data at the time of ingestion.

Received data is stored in BigQuery table. Each device has a separate table. The table structure looks like this:

Requirements

After data collection, I need to analyze data adhering to those rules:

Example

Let's say I receive this data from device:

state state_valid progress timestamp
2 1 2451 20:50:00
0 1 2451 20:50:20
2 1 2451 20:52:29
3 1 2451 20:53:51
3 1 2500 20:54:20
2 0 2500 20:55:09

Below I provide a visualization of that data on a timeline to better understand the next procedures:

Data visualization

So the received data should be processed in one minute intervals, assigning each minute the state that device was in for the better part of that minute. So the above data becomes: Data splitted into one minute intervals

Then, consecutive intervals of same state value should be merged: Data grouped

Result

So, I need a query that would, adhering to the requirements described in Requirements section and given the data shown in the Example section provide me such result:

group_id state progress start_timestamp end_timestamp duration
0 0 0 20:50:00 20:52:00 120s
1 2 0 20:52:00 20:54:00 120s
2 3 49 20:54:00 20:55:00 60s
3 0 0 20:55:00 20:56:00 60s

Sample data

Consider those two data sets as sample data

Sample data 1

Data:

WITH data as (
  SELECT * FROM UNNEST([
    STRUCT(NULL AS state, 0 AS state_valid, 0 as progress, CURRENT_TIMESTAMP() as timestamp),
    (2, 1, 2451, TIMESTAMP('2022-07-01 20:50:00 UTC')),
    (0, 1, 2451, TIMESTAMP('2022-07-01 20:50:20 UTC')),
    (2, 1, 2451, TIMESTAMP('2022-07-01 20:52:29 UTC')),
    (3, 1, 2451, TIMESTAMP('2022-07-01 20:53:51 UTC')),
    (3, 1, 2500, TIMESTAMP('2022-07-01 20:54:20 UTC')),
    (2, 0, 2500, TIMESTAMP('2022-07-01 20:55:09 UTC')),
  ])
  WHERE NOT state IS NULL 
)

Expected outcome:

group_id state progress start_timestamp end_timestamp duration
0 0 0 20:50:00 20:52:00 120s
1 2 0 20:52:00 20:54:00 120s
2 3 49 20:54:00 20:55:00 60s
3 0 0 20:55:00 current_timestamp current_timestamp - 20:55:00

Sample data 2

Data:

WITH data as (
  SELECT * FROM UNNEST([
    STRUCT(NULL AS state, 0 AS state_valid, 0 as progress, CURRENT_TIMESTAMP() as timestamp),
    (2, 1, 2451, TIMESTAMP('2022-07-01 20:50:00 UTC')),
    (0, 1, 2451, TIMESTAMP('2022-07-01 20:50:20 UTC')),
    (2, 1, 2451, TIMESTAMP('2022-07-01 20:52:29 UTC')),
    (3, 1, 2451, TIMESTAMP('2022-07-01 20:53:51 UTC')),
    (3, 1, 2500, TIMESTAMP('2022-07-01 20:54:20 UTC')),
    (3, 1, 2580, TIMESTAMP('2022-07-01 20:55:09 UTC')),
    (3, 1, 2600, TIMESTAMP('2022-07-01 20:59:09 UTC')),
    (3, 1, 2700, TIMESTAMP('2022-07-01 21:20:09 UTC')),
    (2, 0, 2700, TIMESTAMP('2022-07-01 22:11:09 UTC'))
  ])
  WHERE NOT state IS NULL 
)

Expected outcome:

group_id state progress start_timestamp end_timestamp duration
0 0 0 20:50:00 20:52:00 120s
1 2 0 20:52:00 20:54:00 120s
2 3 249 20:54:00 22:11:00 4620s
3 0 0 22:11:00 current_timestamp current_timestamp - 22:11:00

Upvotes: 2

Views: 542

Answers (3)

Mikhail Berlyant
Mikhail Berlyant

Reputation: 172993

For some reason I feel that updating existing answer will be confusing - so see fixed solution here - there are two fixes in two lines at the very final select statement - hey are commented so you can easily locate them

with by_second as (
  select if(state_valid = 0, 0, state) state, progress, ts, timestamp_trunc(ts, minute) ts_minute
  from (
    select *, timestamp_sub(lead(timestamp) over(order by timestamp), interval 1 second) as next_timestamp
    from your_table  
  ), unnest(generate_timestamp_array(
    timestamp, ifnull(next_timestamp, timestamp_trunc(timestamp_add(timestamp, interval 60 second), minute)), interval 1 second
  )) ts
), by_minute as (
  select ts_minute, array_agg(struct(state, progress) order by weight desc limit 1)[offset(0)].*
  from (
    select state, progress, ts_minute, count(*) weight
    from by_second
    group by state, progress, ts_minute
  )
  group by ts_minute
  having sum(weight) > 59
)
select group_id, any_value(state) state, sum(progress) progress, 
  # here changed max(progress) to sum(progress)
  min(ts_minute) start_timestamp,
  timestamp_add(max(ts_minute), interval 1 minute) end_timestamp,
  60 * count(*) duration
from (
  select countif(new_group) over(order by ts_minute) group_id, state, progress, ts_minute
  from (
    select ts_minute, state, progress - lag(progress) over(order by ts_minute) as progress,
      -- ifnull((state, progress) != lag((state, progress)) over(order by ts_minute), true) new_group, 
      # fixed this line with below one
      ifnull((state) != lag(state) over(order by ts_minute), true) new_group,
    from by_minute
  )
)
group by group_id   

Upvotes: 1

Jaytiger
Jaytiger

Reputation: 12254

Yet another approach:

WITH preprocessing AS (
  SELECT IF (LAST_VALUE(state_valid IGNORE NULLS) OVER (ORDER BY ts) = 0, 0, state) AS state,
         LAST_VALUE(state_valid IGNORE NULLS) OVER (ORDER BY ts) AS state_valid,
         progress, ts
    FROM sample
),
intervals_added AS (
  ( SELECT *, 0 src FROM preprocessing UNION ALL
    SELECT null, null, null, ts, 1
      FROM (SELECT MIN(ts) min_ts FROM sample), (SELECT MAX(ts) max_ts FROM sample),
           UNNEST (GENERATE_TIMESTAMP_ARRAY(min_ts, max_ts + INTERVAL 1 MINUTE, INTERVAL 1 MINUTE)) ts
  ) EXCEPT DISTINCT
  SELECT null, null, null, ts, 1 FROM (SELECT ts FROM preprocessing)
),
analysis AS (
  SELECT *, SUM(grp) OVER (ORDER BY ts) AS group_id FROM (
    SELECT * EXCEPT(progress),
           TIMESTAMP_TRUNC(ts, MINUTE) AS start_timestamp,
           progress - LAST_VALUE(progress IGNORE NULLS) OVER w AS progress,
           IF (LAST_VALUE(state IGNORE NULLS) OVER w <> state, 1, 0) AS grp,
           TIMESTAMP_DIFF(LEAD(ts) OVER (ORDER BY ts, src), ts, SECOND) AS diff,
      FROM intervals_added
    WINDOW w AS (ORDER BY ts ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING)
  ) QUALIFY MAX(diff) OVER (PARTITION BY TIMESTAMP_TRUNC(ts, MINUTE)) = diff
)
SELECT group_id, MIN(state) AS sate, SUM(progress) AS progress,
       MIN(start_timestamp) AS start_timestamp,
       MIN(start_timestamp) + INTERVAL COUNT(1) MINUTE AS end_timestamp,
       60 * COUNT(1) AS duration,
  FROM analysis GROUP BY 1 ORDER BY 1;

output:

enter image description here

Upvotes: 1

Mikhail Berlyant
Mikhail Berlyant

Reputation: 172993

Consider below approach

with by_second as (
  select if(state_valid = 0, 0, state) state, progress, ts, timestamp_trunc(ts, minute) ts_minute
  from (
    select *, timestamp_sub(lead(timestamp) over(order by timestamp), interval 1 second) as next_timestamp
    from your_table  
  ), unnest(generate_timestamp_array(
    timestamp, ifnull(next_timestamp, timestamp_trunc(timestamp_add(timestamp, interval 60 second), minute)), interval 1 second
  )) ts
), by_minute as (
  select ts_minute, array_agg(struct(state, progress) order by weight desc limit 1)[offset(0)].*
  from (
    select state, progress, ts_minute, count(*) weight
    from by_second
    group by state, progress, ts_minute
  )
  group by ts_minute
  having sum(weight) > 59
)
select group_id, any_value(state) state, max(progress) progress,
  min(ts_minute) start_timestamp,
  timestamp_add(max(ts_minute), interval 1 minute) end_timestamp,
  60 * count(*) duration
from (
  select countif(new_group) over(order by ts_minute) group_id, state, progress, ts_minute
  from (
    select ts_minute, state, progress - lag(progress) over(order by ts_minute) as progress,
      ifnull((state, progress) != lag((state, progress)) over(order by ts_minute), true) new_group,
    from by_minute
  )
)
group by group_id              

if applied to dummy data as in your question

enter image description here

output is

enter image description here

Upvotes: 2

Related Questions