Alex
Alex

Reputation: 57

How to loop through dataset to create a dataset of summary

I just start learning and using Spark, and currently facing a problem. Any suggestion or hint will be greatly appreciated.

Basically I have a dataset that contain all kind of event of different user, like AppLaunch, GameStart, GameEnd, etc. and I want to create a summary of each user's action of each time he/she start the app.

For example: I have the following dataset:
UserId | Event Type | Time | GameType | Event Id|
11111 | AppLauch | 11:01:53| null | 101 |
11111 | GameStart | 11:01:59| Puzzle | 102 |
11111 | GameEnd | 11:05:31| Puzzle | 103 |
11111 | GameStart | 11:05:58| Word | 104 |
11111 | GameEnd | 11:09:13| Word | 105 |
11111 | AppEnd | 11:09:24| null | 106 |
11111 | AppLauch | 12:03:43| null | 107 |
22222 | AppLauch | 12:03:52| null | 108 |
22222 | GameStart | 12:03:59| Puzzle | 109 |
11111 | GameStart | 12:04:01| Puzzle | 110 |
22222 | GameEnd | 12:06:11| Puzzle | 111 |
11111 | GameEnd | 12:06:13| Puzzle | 112 |
11111 | AppEnd | 12:06:23| null | 113 |
22222 | AppEnd | 12:06:33| null | 114 |

And what I want is a dataset similar to this:
EventId | USerId| Event Type | Time | FirstGamePlayed| LastGamePlayed|
101 |11111 | AppLauch | 11:01:53| Puzzle | Word |
107 |11111 | AppLauch | 12:03:43| Puzzle | Puzzle |
108 |22222 | AppLauch | 12:03:52| Puzzle | Puzzle |

Only need to know the first game played and the last game played, even if there are more than 3 games played in one app-launch.

My initial idea is group them by the user Id and window of time frame (AppLaunch to AppEnd), and then find a way to scan through the dataset, if there is an gameStart event and it fell into the any window, it will be the FirstGamePlayed, the last GameStart event before the time of AppEnd will be the LastGamePlayed. but I didn't find a way to achieve this.

Any hint/suggestion will be nice.

Thanks

Upvotes: 1

Views: 180

Answers (1)

Raphael Roth
Raphael Roth

Reputation: 27373

I think this can be solved using window function followed by a aggregation like this:

df
   // enumerate AppLaunches 
   .withColumn("AppLauchNr", sum(when($"EventType" === "AppLauch", 1)).over(Window.partitionBy($"UserId").orderBy($"Time".asc)))
   // get first last game per AppLaunch
   .withColumn("firstGamePlayed", first($"GameType", true).over(Window.partitionBy($"UserId", $"AppLauchNr").orderBy($"Time".asc)))
   .withColumn("lastGamePlayed", first($"GameType", true).over(Window.partitionBy($"UserId", $"AppLauchNr").orderBy($"Time".desc)))
    // now aggregate
   .groupBy($"AppLauchNr")
   .agg(
        first($"UserId").as("UserId"),
        min($"EventId").as("EventId"),
        lit("AppLauch").as("EventType"), // this is always AppLauch
        min($"Time").as("Time"),
        first($"firstGamePlayed", true).as("firstGamePlayed"),
        first($"lastGamePlayed", true).as("lastGamePlayed")
   )
  .drop($"AppLauchNr")

First and Last game played can also be determined using orderBy().groupBy() instead of window functions, but I'm still not sure about spark preserves the ordering during aggregation (this is not mentioned in the docs, see e.g. Spark DataFrame: does groupBy after orderBy maintain that order? and discussion in https://issues.apache.org/jira/browse/SPARK-16207)

 df
   .withColumn("AppLauchNr", sum(when($"EventType" === "AppLauch", 1)).over(Window.partitionBy($"UserId").orderBy($"Time".asc)))
   .orderBy($"UserId",$"AppLauchNr",$"Time")
   .groupBy($"UserId",$"AppLauchNr")
   .agg(
        first($"EventId").as("EventId"),
        first($"EventType").as("EventType"),
        first($"Time").as("Time"),
        first($"GameType", true).as("firstGamePlayed"),
        last($"GameType", true).as("lastGamePlayed")
   )
   .drop($"AppLauchNr")

Upvotes: 1

Related Questions