Manish Jaiswal
Manish Jaiswal

Reputation: 470

conditional count in spark

I want to count no of page visit by user in a session , here my problem is that user can have multiple session in a day and i have user_id, login_status, and timestamp as follow.

user_id  login_status     timestamp               page_id
534        False        06-06-2019 12:12:30       0
534        False        06-06-2019 12:12:35       0
534        True         06-06-2019 12:17:30       1
534        True         06-06-2019 12:18:35       3
534        False        06-06-2019 12:19:35       0
534        False        06-06-2019 12:20:35       0
534        True         06-06-2019 12:21:30       8
534        True         06-06-2019 12:22:35       7
534        True         06-06-2019 12:23:30       1
534        False        06-06-2019 12:14:35       0

Excepted Output :-

user_id      timestamp               Page_count 
534         06-06-2019 12:17:30       2
534         06-06-2019 12:21:30       3

login start when status becomes true and logout when status is false. please help how i can count page no visit in single session. Thanks in advance.

Upvotes: 1

Views: 1556

Answers (1)

BlueSheepToken
BlueSheepToken

Reputation: 6099

The idea here will be to define "when" this is a new session (If I understood well, this is when lag(login_status, 1) !== login_status && login_status, define it as an int, and sum over them to give id to "sessions".

Adter that a simple groupby should do it.

Let's do it with window functions !

// Some needed imports
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{lag, min, sum}

// Our df
val df = Seq((534, false, "06-06-2019 12:12:30", 0),
(534, false, "06-06-2019 12:12:35", 0),
(534, true, "06-06-2019 12:17:30", 1),
(534, true, "06-06-2019 12:18:35", 3),
(534, false, "06-06-2019 12:19:35", 0),
(534, false, "06-06-2019 12:20:35", 0),
(534, true, "06-06-2019 12:21:30", 8),
(534, true, "06-06-2019 12:22:35", 7),
(534, true, "06-06-2019 12:23:30", 1),
(534, false, "06-06-2019 12:14:35", 0)).toDF("user_id", "login_status", "timestamp", "page_id")

// The window on which we will lag over to define a new session
val userSessionWindow = Window.partitionBy("user_id").orderBy("timestamp")
// Our window to give ids to our sessions
val userWindow = Window.partitionBy("user_id", "login_status").orderBy("timestamp")

// This is how we define a new session, you can change it as you want
val newSession = ('login_status !== lag('login_status, 1).over(userSessionWindow) && 'login_status).cast("bigint")

df.withColumn("new_session", newSession).show

So here, for each new session we got a number 1, this looks great !

+-------+------------+-------------------+-------+-----------+
|user_id|login_status|          timestamp|page_id|new_session|
+-------+------------+-------------------+-------+-----------+
|    534|       false|06-06-2019 12:12:30|      0|          0|
|    534|       false|06-06-2019 12:12:35|      0|          0|
|    534|       false|06-06-2019 12:14:35|      0|          0|
|    534|        true|06-06-2019 12:17:30|      1|          1|
|    534|        true|06-06-2019 12:18:35|      3|          0|
|    534|       false|06-06-2019 12:19:35|      0|          0|
|    534|       false|06-06-2019 12:20:35|      0|          0|
|    534|        true|06-06-2019 12:21:30|      8|          1|
|    534|        true|06-06-2019 12:22:35|      7|          0|
|    534|        true|06-06-2019 12:23:30|      1|          0|
+-------+------------+-------------------+-------+-----------+

Let's define ids to our session with a rolling sum.

val withSessionIDs = df.withColumn("session", sum(newSession).over(userWindow))

withSessionIDs.show
+-------+------------+-------------------+-------+-------+
|user_id|login_status|          timestamp|page_id|session|
+-------+------------+-------------------+-------+-------+
|    534|       false|06-06-2019 12:12:30|      0|      0|
|    534|       false|06-06-2019 12:12:35|      0|      0|
|    534|       false|06-06-2019 12:14:35|      0|      0|
|    534|       false|06-06-2019 12:19:35|      0|      0|
|    534|       false|06-06-2019 12:20:35|      0|      0|
|    534|        true|06-06-2019 12:17:30|      1|      1|
|    534|        true|06-06-2019 12:18:35|      3|      1|
|    534|        true|06-06-2019 12:21:30|      8|      2|
|    534|        true|06-06-2019 12:22:35|      7|      2|
|    534|        true|06-06-2019 12:23:30|      1|      2|
+-------+------------+-------------------+-------+-------+

We have now ID on or session, we can simply do a groupby !

// Do not forget to remove the 0 session, which stands for every user not logged in
withSessionIDs.groupBy("user_id", "session").agg(count("*").as("page_visited"), first("timestamp").as("first_login")).where($"session" !== 0).drop("session").show
+-------+------------+-------------------+
|user_id|page_visited|        first_login|
+-------+------------+-------------------+
|    534|           2|06-06-2019 12:17:30|
|    534|           3|06-06-2019 12:21:30|
+-------+------------+-------------------+

Upvotes: 1

Related Questions