Reputation: 668
There's a table storing the time that users listened to music which looks like the follow:
| user | music | listen_time |
| A | m | 2019-07-01 16:00:00 |
| A | n | 2019-07-01 16:05:00 |
| A | x | 2019-07-01 16:10:00 |
| A | y | 2019-07-01 17:10:00 |
| A | z | 2019-07-02 18:10:00 |
| A | m | 2019-07-02 18:15:00 |
| B | t | 2019-07-02 18:15:00 |
| B | s | 2019-07-02 18:20:00 |
The calculation result should be the list of music every user has listened with interval less than 30min, which should looks like (music_list should be ArrayType column):
| user | music_list |
| A | m, n, x |
| A | y |
| A | z, m |
| B | t, s |
How could I possibly implement it in scala spark dataframe?
Upvotes: 0
Views: 901
Reputation: 6169
The idea of this kind of exercise, which is a really good exercise to master Spark, is to use lags to create session Ids using cumulative sums.
So the steps are :
I strongly suggest you to try with the instructions before reading the next part of this answer.
Here is the solution :
import org.apache.spark.sql.{functions => F}
import org.apache.spark.sql.expressions.Window
// Create the data
// Here we use unix time, this is easier to check for the 30 minuts difference.
val df = Seq(("A", "m", "2019-07-01 16:00:00"),
("A", "n", "2019-07-01 16:05:00"),
("A", "x", "2019-07-01 16:10:00"),
("A", "y", "2019-07-01 17:10:00"),
("A", "z", "2019-07-02 18:10:00"),
("A", "m", "2019-07-02 18:15:00"),
("B", "t", "2019-07-02 18:15:00"),
("B", "s", "2019-07-02 18:20:00")).toDF("user", "music", "listen").withColumn("unix", F.unix_timestamp($"listen", "yyyy-MM-dd HH:mm:ss"))
// The window on which we will lag over to define a new session
val userSessionWindow = Window.partitionBy("user").orderBy("unix")
// This will put a one in front of each new session. The condition changes according to how you define a "new session"
val newSession = ('unix > lag('unix, 1).over(userSessionWindow) + 30*60).cast("bigint")
val dfWithNewSession = df.withColumn("newSession", newSession).na.fill(1)
|user|music| listen| unix|newSession|
| B| t|2019-07-02 18:15:00|1562084100| 1|
| B| s|2019-07-02 18:20:00|1562084400| 0|
| A| m|2019-07-01 16:00:00|1561989600| 1|
| A| n|2019-07-01 16:05:00|1561989900| 0|
| A| x|2019-07-01 16:10:00|1561990200| 0|
| A| y|2019-07-01 17:10:00|1561993800| 1|
| A| z|2019-07-02 18:10:00|1562083800| 1|
| A| m|2019-07-02 18:15:00|1562084100| 0|
// To define a session id to each user, we just need to do a cumulative sum on users' new Session
val userWindow = Window.partitionBy("user").orderBy("unix")
val dfWithSessionId ="session", sum("newSession").over(userWindow))
|user|music| listen| unix|newSession|session|
| B| t|2019-07-02 18:15:00|1562084100| 1| 1|
| B| s|2019-07-02 18:20:00|1562084400| 0| 1|
| A| m|2019-07-01 16:00:00|1561989600| 1| 1|
| A| n|2019-07-01 16:05:00|1561989900| 0| 1|
| A| x|2019-07-01 16:10:00|1561990200| 0| 1|
| A| y|2019-07-01 17:10:00|1561993800| 1| 2|
| A| z|2019-07-02 18:10:00|1562083800| 1| 3|
| A| m|2019-07-02 18:15:00|1562084100| 0| 3|
val dfFinal = dfWithSessionId.groupBy("user", "session").agg(F.collect_list("music").as("music")).select("user", "music").show
|user| music|
| B| [t, s]|
| A|[m, n, x]|
| A| [y]|
| A| [z, m]|
Upvotes: 1
Reputation: 13581
This is a hint.
df.groupBy($"user", window($"listen_time", "30 minutes")).agg(collect_list($"music"))
The result is
|user|window |collect_list(music)|
|A |[2019-07-01 16:00:00, 2019-07-01 16:30:00]|[m, n, x] |
|B |[2019-07-02 18:00:00, 2019-07-02 18:30:00]|[t, s] |
|A |[2019-07-02 18:00:00, 2019-07-02 18:30:00]|[z, m] |
|A |[2019-07-01 17:00:00, 2019-07-01 17:30:00]|[y] |
which is similar result but not exactly same. Use concat_ws
after collect_list
then you can obtain m, n, x
Upvotes: 2
Reputation: 1409
This will work for you
val data = Seq(("A", "m", "2019-07-01 16:00:00"),
("A", "n", "2019-07-01 16:05:00"),
("A", "x", "2019-07-01 16:10:00"),
("A", "y", "2019-07-01 17:10:00"),
("A", "z", "2019-07-02 18:10:00"),
("A", "m", "2019-07-02 18:15:00"),
("B", "t", "2019-07-02 18:15:00"),
("B", "s", "2019-07-02 18:20:00"))
val getinterval = udf((time: Long) => {
(time / 1800) * 1800
val df = data.toDF("user", "music", "listen")
.withColumn("unixtime", unix_timestamp(col("listen")))
.withColumn("interval", getinterval(col("unixtime")))
val res = df.groupBy(col("user"), col("interval"))
Upvotes: 1