fullstack
fullstack

Reputation: 834

How to create clusters of records from consecutive events

I have BI data stored in a table in snowflake. To simplify, let's say there are only 3 columns in the table:

user_id event_time event_key

I would like to create key clusters on top of the key events. For each user, I want to find groups of consecutive rows that their event_key is in <event_keys_array> and the time difference (event_time) from the previous event of the set is less than 30 seconds.

Meaning, if the event is created less than 30 seconds from the previous event and there are no event with event_key that is not included in <event_keys_array> between them, it will be considered as the same cluster.

How can I achieve this?

Upvotes: 1

Views: 182

Answers (1)

Jamie Stooke
Jamie Stooke

Reputation: 76

This can be done inline with a collection of nested window functions. I've taken some liberties with the "event_keys_array" requirement without some example data to go on? I tend to nest sub queries but this could just as easily be expressed in a chain of CTEs

The key thing is identifying each cluster start. With that the rest falls in to place.

CREATE OR REPLACE TEMPORARY TABLE event_stream
(
     event_id    NUMBER(38,0)
    ,user_id     NUMBER(38,0)
    ,event_key   NUMBER(38,0)
    ,event_time  TIMESTAMP_NTZ(3)
);

INSERT INTO event_stream
(event_id,user_id,event_key,event_time)
VALUES
     (1 ,1,1,'2023-01-25 16:25:01.123')--User 1 - Cluster 1
    ,(2 ,1,1,'2023-01-25 16:25:22.123')--User 1 - Cluster 1
    ,(3 ,1,1,'2023-01-25 16:25:46.123')--User 1 - Cluster 1
    ,(4 ,1,2,'2023-01-25 16:26:01.123')--User 1 - Cluster 2 (Not in array)
    ,(5 ,1,3,'2023-01-25 16:26:02.123')--User 1 - Cluster 3
    ,(6 ,2,1,'2023-01-25 16:25:01.123')--User 2 - Cluster 1
    ,(7 ,2,1,'2023-01-25 16:26:01.123')--User 2 - Cluster 2
    ,(8 ,2,1,'2023-01-25 16:27:01.123')--User 2 - Cluster 3 (in array)
    ,(9 ,2,3,'2023-01-25 16:27:04.123')--User 2 - Cluster 3 (in array)
    ,(10,2,2,'2023-01-25 16:27:07.123')--User 2 - Cluster 4
    ;


SELECT  --Distinct to dedup final output down to window function outputs. remove to bring event level data through alongside cluster details.
        DISTINCT
         D.user_id                                                                                                  AS user_id
        ,MAX(CASE WHEN D.event_position = 1 THEN D.event_time END) OVER(PARTITION BY D.user_id,D.grp)               AS event_cluster_start_time
        ,MAX(CASE WHEN D.event_position_reverse = 1 THEN D.event_time END) OVER(PARTITION BY D.user_id,D.grp)       AS event_cluster_end_time
        ,DATEDIFF(SECOND,event_cluster_start_time,event_cluster_end_time)                                           AS event_cluster_duration_seconds
        ,COUNT(1) OVER(PARTITION BY D.user_id,D.grp)                                                                AS event_cluster_total_contained_events
        ,FIRST_VALUE(D.event_id) OVER(PARTITION BY D.user_id,D.grp ORDER BY D.event_time ASC)                       AS event_cluster_intitial_event_id
FROM    (
            SELECT  *
                    ,ROW_NUMBER() OVER(PARTITION BY A.user_id,A.grp ORDER BY A.event_time)      AS event_position
                    ,ROW_NUMBER() OVER(PARTITION BY A.user_id,A.grp ORDER BY A.event_time DESC) AS event_position_reverse
            FROM    (
                        SELECT  *
                                 --A rolling sum of cluster starts at the row level provides a value to partition the data on.
                                ,SUM(A.is_start) OVER(PARTITION BY A.user_id ORDER BY A.event_time ROWS UNBOUNDED PRECEDING) AS grp
                        FROM    (
                                    SELECT   A.event_id
                                            ,A.user_id
                                            ,A.event_key
                                            ,array_contains(A.event_key::variant, array_construct(1,3)) AS event_key_grouped
                                            ,A.event_time
                                            ,LAG(event_time,1) OVER(PARTITION BY A.user_id ORDER BY A.event_time) AS previous_event_time
                                            ,LAG(event_key_grouped,1) OVER(PARTITION BY A.user_id ORDER BY A.event_time) AS previous_event_key_grouped
                                            ,CASE 
                                                WHEN    --Current event should be grouped with previous if within 30 seconds
                                                        DATEADD(SECOND,-30,A.event_time) <= previous_event_time 
                                                        --add additional cluster inclusion criteria, e.g. same grouped key
                                                    AND event_key_grouped = previous_event_key_grouped
                                                THEN NULL ELSE 1
                                             END  AS is_start
                                    FROM    event_stream   A
                                )   AS A
                    )   AS A
        )   AS D
ORDER BY 1,2        ;

If you wanted to split clusters by another field value such as event_key you just need to add the field to all window function partitions.

Result Set: Result Set

Upvotes: 2

Related Questions