Reputation: 1307
I have a data frame in below format:
|u_name|Date |Hour | Content_id|WatchTime(sec) |
|user1 | 2019-07-28 | 21 | 100 | 10800 |
|user2 | 2019-07-28 | 20 | 101 | 3600 |
|user3 | 2019-07-28 | 21 | 202 | 7000 |
I need to convert this data frame to below, basically, I need to create an entry per hour so if WatchTime(sec)
is more than 3600 seconds I need to create a new entry for the next hour
|u_name|Date |Hour | Content_id|WatchTime(sec) |
|user1 | 2019-07-28 | 21 | 100 | 3600 |
|user1 | 2019-07-28 | 22 | 100 | 3600 |
|user1 | 2019-07-28 | 23 | 100 | 3600 |
|user2 | 2019-07-28 | 20 | 101 | 3600 |
|user3 | 2019-07-28 | 21 | 202 | 3600 |
|user3 | 2019-07-28 | 22 | 202 | 3400 |
This can be achieved someway using SQL but I am using Scala and what is the efficient way to achieve this.
Upvotes: 3
Views: 313
Reputation: 1751
We can achieve the outcome purely on Dataset like following,
//Define a case class
case class UserStat(uname: String, date: java.util.Date, hour: Int, contentId: String, watchTimeInSec: Int)
//Define a variable offset
val offset: Int = 3600
userStatDs.flatMap(userStat => {
val remainingWatchTimeInSec = userStat.watchTimeInSec % offset
val remainingWatchTimeInSecCount = if (remainingWatchTimeInSec == 0) 0 else 1
val totalIterationCount = (userStat.watchTimeInSec / offset) + remainingWatchTimeInSecCount
if (userStat.watchTimeInSec <= offset) List(userStat)
else {
(0 until totalIterationCount)
.map(index => {
if ((userStat.watchTimeInSec / offset) == index)
userStat.copy(hour = userStat.hour + index, watchTimeInSec = remainingWatchTimeInSec)
else
userStat.copy(hour = userStat.hour + index, watchTimeInSec = offset)
})
}
})
Upvotes: 0
Reputation: 6907
You can achieve this in spark 2.4+ with the following transformations:
sequence
higher-order functionval result = df
.withColumn("stamps", sequence(lit(0), 'WatchTime, lit(3600)))
.withColumn("offset", explode('stamps))
.withColumn("Hour", 'Hour + ('offset/3600).cast("int"))
.withColumn("WatchTime", 'WatchTime - 'offset)
.withColumn("WatchTime", when('WatchTime <= 3600, 'WatchTime).otherwise(3600))
.filter('WatchTime > 0)
.drop("stamps","offset")
result.show()
+------+-------------------+----+----------+---------+
|u_name| Date|Hour|Content_id|WatchTime|
+------+-------------------+----+----------+---------+
| user1|2019-07-28 00:00:00| 21| 100| 3600|
| user1|2019-07-28 00:00:00| 22| 100| 3600|
| user1|2019-07-28 00:00:00| 23| 100| 3600|
| user2|2019-07-28 00:00:00| 20| 101| 3600|
| user3|2019-07-28 00:00:00| 21| 202| 3600|
| user3|2019-07-28 00:00:00| 22| 202| 3400|
+------+-------------------+----+----------+---------+
This algorithm may generate hours higher than 23. If you need accurate Date and Hour information, I'd advice you to use single unix timestamp column combining the start date and hour since it will let you do time manipulation and proper conversion to date and hour when needed.
It would look like this:
val result = df
.withColumn("StartDateTime", unix_timestamp('Date) + ('Hour * 3600 ))
.withColumn("stamps", sequence(lit(0), 'WatchTime, lit(3600)))
.withColumn("offset", explode('stamps))
.withColumn("StartDateTime", from_unixtime('StartDateTime + 'offset))
.withColumn("WatchTime", when('WatchTime - 'offset>3600,3600).otherwise('WatchTime - 'offset))
.filter('WatchTime > 0)
.select('u_name, 'content_id, 'StartDateTime, 'WatchTime)
result.show
+------+----------+-------------------+---------+
|u_name|content_id| StartDateTime|WatchTime|
+------+----------+-------------------+---------+
| user1| 100|2019-07-28 21:00:00| 3600|
| user1| 100|2019-07-28 22:00:00| 3600|
| user1| 100|2019-07-28 23:00:00| 3600|
| user2| 101|2019-07-28 20:00:00| 3600|
| user3| 202|2019-07-28 21:00:00| 3600|
| user3| 202|2019-07-28 22:00:00| 3400|
+------+----------+-------------------+---------+
Upvotes: 5
Reputation: 1027
I would do something like that :
// Get max for loop
val max = df
.agg(max(floor($"WatchTime(sec)" / 3600)).as("max"))
.select($"max")
.first
.getInt(0)
// Union all
val newDf = (0 to max)
.map { i =>
df.filter($"WatchTime(sec)" > i * 3600)
.withColumn("Hour", $"Hour" + i)
.withColumn(
"WatchTime(sec)",
when($"WatchTime(sec)" - i * 3600 > 3600, 3600)
.otherwise($"WatchTime(sec)" - i * 3600)
)
}
.reduceLeft { (df1, df2) =>
df1.union(df2)
}
.orderBy($"u_name", $"Date", $"Hour")
It's just a sugestion on how to do it. There surely is some more efficient ways.
Upvotes: 0