Reputation: 1183
I have IoT devices that are constantly sending data to the server. The data consists of those fields:
state
;state_valid
;progress
;timestamp
;There is no guarantee that data will be received in correct time order, meaning that sometimes it might send data captured in the past, that removes the option to analyze and enrich data at the time of ingestion.
Received data is stored in BigQuery
table. Each device has a separate table. The table structure looks like this:
state
: INTEGER, REQUIREDstate_valid
: BOOLEAN, NULLABLEprogress
: INTEGER, REQUIREDtimestamp
: TIMESTAMP, REQUIREDAfter data collection, I need to analyze data adhering to those rules:
state
value until different state is received;state_valid
is false
- state
value should be ignored and 0
should be used instead of it;state_valid
is NULL
, last received state_valid
value should be used;0
from 20:51:01 to 20:51:18 and state 2
for 20:51:18 to 20:52:12, 20:51:00 to 20:51:59 should be marked as state 2
.state
value and represent it as one record with start
and end
timestampsstate
should have calculated progress difference (max_progress
- min_progress
)Let's say I receive this data from device:
state | state_valid | progress | timestamp |
---|---|---|---|
2 | 1 | 2451 | 20:50:00 |
0 | 1 | 2451 | 20:50:20 |
2 | 1 | 2451 | 20:52:29 |
3 | 1 | 2451 | 20:53:51 |
3 | 1 | 2500 | 20:54:20 |
2 | 0 | 2500 | 20:55:09 |
Below I provide a visualization of that data on a timeline to better understand the next procedures:
So the received data should be processed in one minute intervals, assigning each minute the state that device was in for the better part of that minute. So the above data becomes:
Then, consecutive intervals of same state
value should be merged:
So, I need a query that would, adhering to the requirements described in Requirements
section and given the data shown in the Example
section provide me such result:
group_id | state | progress | start_timestamp | end_timestamp | duration |
---|---|---|---|---|---|
0 | 0 | 0 | 20:50:00 | 20:52:00 | 120s |
1 | 2 | 0 | 20:52:00 | 20:54:00 | 120s |
2 | 3 | 49 | 20:54:00 | 20:55:00 | 60s |
3 | 0 | 0 | 20:55:00 | 20:56:00 | 60s |
Consider those two data sets as sample data
Data:
WITH data as (
SELECT * FROM UNNEST([
STRUCT(NULL AS state, 0 AS state_valid, 0 as progress, CURRENT_TIMESTAMP() as timestamp),
(2, 1, 2451, TIMESTAMP('2022-07-01 20:50:00 UTC')),
(0, 1, 2451, TIMESTAMP('2022-07-01 20:50:20 UTC')),
(2, 1, 2451, TIMESTAMP('2022-07-01 20:52:29 UTC')),
(3, 1, 2451, TIMESTAMP('2022-07-01 20:53:51 UTC')),
(3, 1, 2500, TIMESTAMP('2022-07-01 20:54:20 UTC')),
(2, 0, 2500, TIMESTAMP('2022-07-01 20:55:09 UTC')),
])
WHERE NOT state IS NULL
)
Expected outcome:
group_id | state | progress | start_timestamp | end_timestamp | duration |
---|---|---|---|---|---|
0 | 0 | 0 | 20:50:00 | 20:52:00 | 120s |
1 | 2 | 0 | 20:52:00 | 20:54:00 | 120s |
2 | 3 | 49 | 20:54:00 | 20:55:00 | 60s |
3 | 0 | 0 | 20:55:00 | current_timestamp |
current_timestamp - 20:55:00 |
Data:
WITH data as (
SELECT * FROM UNNEST([
STRUCT(NULL AS state, 0 AS state_valid, 0 as progress, CURRENT_TIMESTAMP() as timestamp),
(2, 1, 2451, TIMESTAMP('2022-07-01 20:50:00 UTC')),
(0, 1, 2451, TIMESTAMP('2022-07-01 20:50:20 UTC')),
(2, 1, 2451, TIMESTAMP('2022-07-01 20:52:29 UTC')),
(3, 1, 2451, TIMESTAMP('2022-07-01 20:53:51 UTC')),
(3, 1, 2500, TIMESTAMP('2022-07-01 20:54:20 UTC')),
(3, 1, 2580, TIMESTAMP('2022-07-01 20:55:09 UTC')),
(3, 1, 2600, TIMESTAMP('2022-07-01 20:59:09 UTC')),
(3, 1, 2700, TIMESTAMP('2022-07-01 21:20:09 UTC')),
(2, 0, 2700, TIMESTAMP('2022-07-01 22:11:09 UTC'))
])
WHERE NOT state IS NULL
)
Expected outcome:
group_id | state | progress | start_timestamp | end_timestamp | duration |
---|---|---|---|---|---|
0 | 0 | 0 | 20:50:00 | 20:52:00 | 120s |
1 | 2 | 0 | 20:52:00 | 20:54:00 | 120s |
2 | 3 | 249 | 20:54:00 | 22:11:00 | 4620s |
3 | 0 | 0 | 22:11:00 | current_timestamp |
current_timestamp - 22:11:00 |
Upvotes: 2
Views: 542
Reputation: 172993
For some reason I feel that updating existing answer will be confusing - so see fixed solution here - there are two fixes in two lines at the very final select statement - hey are commented so you can easily locate them
with by_second as (
select if(state_valid = 0, 0, state) state, progress, ts, timestamp_trunc(ts, minute) ts_minute
from (
select *, timestamp_sub(lead(timestamp) over(order by timestamp), interval 1 second) as next_timestamp
from your_table
), unnest(generate_timestamp_array(
timestamp, ifnull(next_timestamp, timestamp_trunc(timestamp_add(timestamp, interval 60 second), minute)), interval 1 second
)) ts
), by_minute as (
select ts_minute, array_agg(struct(state, progress) order by weight desc limit 1)[offset(0)].*
from (
select state, progress, ts_minute, count(*) weight
from by_second
group by state, progress, ts_minute
)
group by ts_minute
having sum(weight) > 59
)
select group_id, any_value(state) state, sum(progress) progress,
# here changed max(progress) to sum(progress)
min(ts_minute) start_timestamp,
timestamp_add(max(ts_minute), interval 1 minute) end_timestamp,
60 * count(*) duration
from (
select countif(new_group) over(order by ts_minute) group_id, state, progress, ts_minute
from (
select ts_minute, state, progress - lag(progress) over(order by ts_minute) as progress,
-- ifnull((state, progress) != lag((state, progress)) over(order by ts_minute), true) new_group,
# fixed this line with below one
ifnull((state) != lag(state) over(order by ts_minute), true) new_group,
from by_minute
)
)
group by group_id
Upvotes: 1
Reputation: 12254
Yet another approach:
WITH preprocessing AS (
SELECT IF (LAST_VALUE(state_valid IGNORE NULLS) OVER (ORDER BY ts) = 0, 0, state) AS state,
LAST_VALUE(state_valid IGNORE NULLS) OVER (ORDER BY ts) AS state_valid,
progress, ts
FROM sample
),
intervals_added AS (
( SELECT *, 0 src FROM preprocessing UNION ALL
SELECT null, null, null, ts, 1
FROM (SELECT MIN(ts) min_ts FROM sample), (SELECT MAX(ts) max_ts FROM sample),
UNNEST (GENERATE_TIMESTAMP_ARRAY(min_ts, max_ts + INTERVAL 1 MINUTE, INTERVAL 1 MINUTE)) ts
) EXCEPT DISTINCT
SELECT null, null, null, ts, 1 FROM (SELECT ts FROM preprocessing)
),
analysis AS (
SELECT *, SUM(grp) OVER (ORDER BY ts) AS group_id FROM (
SELECT * EXCEPT(progress),
TIMESTAMP_TRUNC(ts, MINUTE) AS start_timestamp,
progress - LAST_VALUE(progress IGNORE NULLS) OVER w AS progress,
IF (LAST_VALUE(state IGNORE NULLS) OVER w <> state, 1, 0) AS grp,
TIMESTAMP_DIFF(LEAD(ts) OVER (ORDER BY ts, src), ts, SECOND) AS diff,
FROM intervals_added
WINDOW w AS (ORDER BY ts ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING)
) QUALIFY MAX(diff) OVER (PARTITION BY TIMESTAMP_TRUNC(ts, MINUTE)) = diff
)
SELECT group_id, MIN(state) AS sate, SUM(progress) AS progress,
MIN(start_timestamp) AS start_timestamp,
MIN(start_timestamp) + INTERVAL COUNT(1) MINUTE AS end_timestamp,
60 * COUNT(1) AS duration,
FROM analysis GROUP BY 1 ORDER BY 1;
output:
Upvotes: 1
Reputation: 172993
Consider below approach
with by_second as (
select if(state_valid = 0, 0, state) state, progress, ts, timestamp_trunc(ts, minute) ts_minute
from (
select *, timestamp_sub(lead(timestamp) over(order by timestamp), interval 1 second) as next_timestamp
from your_table
), unnest(generate_timestamp_array(
timestamp, ifnull(next_timestamp, timestamp_trunc(timestamp_add(timestamp, interval 60 second), minute)), interval 1 second
)) ts
), by_minute as (
select ts_minute, array_agg(struct(state, progress) order by weight desc limit 1)[offset(0)].*
from (
select state, progress, ts_minute, count(*) weight
from by_second
group by state, progress, ts_minute
)
group by ts_minute
having sum(weight) > 59
)
select group_id, any_value(state) state, max(progress) progress,
min(ts_minute) start_timestamp,
timestamp_add(max(ts_minute), interval 1 minute) end_timestamp,
60 * count(*) duration
from (
select countif(new_group) over(order by ts_minute) group_id, state, progress, ts_minute
from (
select ts_minute, state, progress - lag(progress) over(order by ts_minute) as progress,
ifnull((state, progress) != lag((state, progress)) over(order by ts_minute), true) new_group,
from by_minute
)
)
group by group_id
if applied to dummy data as in your question
output is
Upvotes: 2