Normal
Normal

Reputation: 1367

spark sql: how to create sessionId for user-item

Let's say I've got dataset like this:

|    item    | event       | timestamp |   user    |
|:-----------|------------:|:---------:|:---------:|
| titanic    | view        |     1     |    1      |
| titanic    | add to bag  |     2     |    1      | 
| titanic    | close       |     3     |    1      | 
| avatar     | view        |     6     |    1      |
| avatar     | close       |     10    |    1      |
| titanic    | view        |     20    |    1      |
| titanic    | purchase    |     30    |    1      |

and so on. And I need to calculate sessionId for each user for continuous going events corresponding to particular item.

So for that particular data output should be the following :

|    item    | event       | timestamp |   user    |   sessionId    |
|:-----------|------------:|:---------:|:---------:|:--------------:|
| titanic    | view        |     1     |    1      |   session1     |
| titanic    | add to bag  |     2     |    1      |   session1     |
| titanic    | close       |     3     |    1      |   session1     |
| avatar     | view        |     6     |    1      |   session2     |
| avatar     | close       |     10    |    1      |   session2     |
| titanic    | view        |     20    |    1      |   session3     |
| titanic    | purchase    |     30    |    1      |   session3     |

I was trying to use similar approach as described here Spark: How to create a sessionId based on userId and timestamp with window:

Window.partitionBy("user", "item").orderBy("timestamp")

But that just doesn't work because the same user - item combination might be in different sessions. For example see session1 and session3.
And with that window they become the same session. Need help with another approach how to implement that.

Upvotes: 2

Views: 1179

Answers (2)

Gordon Linoff
Gordon Linoff

Reputation: 1269447

You seem to need to count the number of "view" records cumulatively. If so:

select t.*,
       sum(case when event = 'view' then 1 else 0 end) over (partition by user order by timestamp) as session
from t;

Upvotes: 1

Leo C
Leo C

Reputation: 22439

Here's one approach that first generates a column of timestamp value with conditional null, uses last(ts, ignoreNulls) along with rowsBetween to backfill with the last non-null timestamp value, and finally construct sessionId using dense_rank:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val df = Seq(
  ("titanic", "view", 1, 1),
  ("titanic", "add to bag", 2, 1),
  ("titanic", "close", 3, 1),
  ("avatar", "view", 6, 1),
  ("avatar", "close", 10, 1),
  ("titanic", "view", 20, 1),
  ("titanic", "purchase", 30, 1)
).toDF("item", "event", "timestamp", "user")

val win1 = Window.partitionBy($"user").orderBy($"timestamp")
val win2 = Window.partitionBy($"user").orderBy($"sessTS")

df.
  withColumn( "firstTS",
    when( row_number.over(win1) === 1 || $"item" =!= lag($"item", 1).over(win1),
      $"timestamp" )
  ).
  withColumn( "sessTS",
    last($"firstTS", ignoreNulls = true).
      over(win1.rowsBetween(Window.unboundedPreceding, 0))
  ).
  withColumn("sessionId", concat(lit("session"), dense_rank.over(win2))).
  show

// +-------+----------+---------+----+-------+------+---------+
// |   item|     event|timestamp|user|firstTS|sessTS|sessionId|
// +-------+----------+---------+----+-------+------+---------+
// |titanic|      view|        1|   1|      1|     1| session1|
// |titanic|add to bag|        2|   1|   null|     1| session1|
// |titanic|     close|        3|   1|   null|     1| session1|
// | avatar|      view|        6|   1|      6|     6| session2|
// | avatar|     close|       10|   1|   null|     6| session2|
// |titanic|      view|       20|   1|     20|    20| session3|
// |titanic|  purchase|       30|   1|   null|    20| session3|
// +-------+----------+---------+----+-------+------+---------+

Upvotes: 6

Related Questions