Reputation: 93
Note: My grouping can contain up to 5-10K rows per group for the aggregation. So an efficient code is highly desirable.
My Data
val df1 = sc.parallelize(Seq(
("user2", "iphone", "2017-12-23 16:58:08", "Success"),
("user2", "iphone", "2017-12-23 16:58:12", "Success"),
("user2", "iphone", "2017-12-23 16:58:20", "Success"),
("user2", "iphone", "2017-12-23 16:58:25", "Success"),
("user2", "iphone", "2017-12-23 16:58:35", "Success"),
("user2", "iphone", "2017-12-23 16:58:45", "Success")
)).toDF("username", "device", "attempt_at", "stat")
+--------+------+-------------------+-------+
|username|device| attempt_at| stat|
+--------+------+-------------------+-------+
| user2|iphone|2017-12-23 16:58:08|Success|
| user2|iphone|2017-12-23 16:58:12|Success|
| user2|iphone|2017-12-23 16:58:20|Success|
| user2|iphone|2017-12-23 16:58:25|Success|
| user2|iphone|2017-12-23 16:58:35|Success|
| user2|iphone|2017-12-23 16:58:45|Success|
+--------+------+-------------------+-------+
What I want
A grouping by (username,device) for the latest time an event occurred.
+--------+------+-------------------+-------+-------------------+
|username|device| attempt_at| stat|previous_attempt_at|
+--------+------+-------------------+-------+-------------------+
| user2|iphone|2017-12-23 16:58:45|Success|2017-12-23 16:58:35|
+--------+------+-------------------+-------+-------------------+
Exceptions in desired output:
Now since I mentioned it has to be in a specific time window for example in the input dataset below where the last row has
the latest date timestamp of 23rd December. Now If I want a specific time window of going back 1 day and give me the last attempt, the 'previous_attempt_at' column will be null, since there are no events the previous day which should be in 22nd January. It all depends on the input timestamp range.
//Initial Data
+--------+------+-------------------+-------+
|username|device| attempt_at| stat|
+--------+------+-------------------+-------+
| user2|iphone|2017-12-20 16:58:08|Success|
| user2|iphone|2017-12-20 16:58:12|Success|
| user2|iphone|2017-12-20 16:58:20|Success|
| user2|iphone|2017-12-20 16:58:25|Success|
| user2|iphone|2017-12-20 16:58:35|Success|
| user2|iphone|2017-12-23 16:58:45|Success|
+--------+------+-------------------+-------+
// Desired Output
A grouping by (username,device) for the latest time an event occurred.
+--------+------+-------------------+-------+-------------------+
|username|device| attempt_at| stat|previous_attempt_at|
+--------+------+-------------------+-------+-------------------+
| user2|iphone|2017-12-23 16:58:45|Success| null|
+--------+------+-------------------+-------+-------------------+
What I Have.
val w = (Window.partitionBy("username", "device")
.orderBy(col("attempt_at").cast("timestamp").cast("long"))
.rangeBetween(-3600, -1)
)
val df2 = df1.withColumn("previous_attempt_at", last("attempt_at").over(w))
+--------+------+-------------------+-------+-------------------+
|username|device| attempt_at| stat|previous_attempt_at|
+--------+------+-------------------+-------+-------------------+
| user2|iphone|2017-12-23 16:58:08|Success| null|
| user2|iphone|2017-12-23 16:58:12|Success|2017-12-23 16:58:08|
| user2|iphone|2017-12-23 16:58:20|Success|2017-12-23 16:58:12|
| user2|iphone|2017-12-23 16:58:25|Success|2017-12-23 16:58:20|
| user2|iphone|2017-12-23 16:58:35|Success|2017-12-23 16:58:25|
| user2|iphone|2017-12-23 16:58:45|Success|2017-12-23 16:58:35|
+--------+------+-------------------+-------+-------------------+
Notes. The code I have does windowing for every row in the specific user grouping. Which is highly inefficient while working with large scale of data also doesn't give the latest attempt. I don't need all the rows except for the last one.
Upvotes: 1
Views: 825
Reputation: 41957
All you need is an additional groupBy
and aggregation
but before that you would need collect_list
function for cumulative collection of previous dates and udf
function to check for the previous attempt_at is within the time limit and to convert the three columns ("attempt_at", "stat", "previous_attempt_at"
) as struct
for selecting the last one as
import org.apache.spark.sql.functions._
import java.time._
import java.time.temporal._
import java.time.format._
def durationUdf = udf((actualtimestamp: String, timestamps: Seq[String])=> {
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
val actualDateTime = LocalDateTime.parse(actualtimestamp, formatter)
val diffDates = timestamps.init.filter(x => LocalDateTime.from(LocalDateTime.parse(x, formatter)).until(actualDateTime, ChronoUnit.DAYS) <= 1)
if(diffDates.size > 0) diffDates.last else null
})
import org.apache.spark.sql.expressions._
val w = Window.partitionBy("username", "device").orderBy(col("attempt_at").cast("timestamp").cast("long"))
val df2 = df1.withColumn("previous_attempt_at", durationUdf(col("attempt_at"), collect_list("attempt_at").over(w)))
.withColumn("struct", struct(col("attempt_at").cast("timeStamp").as("attempt_at"),col("stat"), col("previous_attempt_at")))
.groupBy("username", "device").agg(max("struct").as("struct"))
.select(col("username"), col("device"), col("struct.attempt_at"), col("struct.stat"), col("struct.previous_attempt_at"))
This should give you for the later example
+--------+------+---------------------+-------+-------------------+
|username|device|attempt_at |stat |previous_attempt_at|
+--------+------+---------------------+-------+-------------------+
|user2 |iphone|2017-12-23 16:58:45.0|Success|null |
+--------+------+---------------------+-------+-------------------+
and the following for the previous input data
+--------+------+---------------------+-------+-------------------+
|username|device|attempt_at |stat |previous_attempt_at|
+--------+------+---------------------+-------+-------------------+
|user2 |iphone|2017-12-23 16:58:45.0|Success|2017-12-23 16:58:35|
+--------+------+---------------------+-------+-------------------+
and you can change the logic for hours by changing the ChronoUnit.DAYS
in udf
function to ChronoUnit.HOURS
and so on
Upvotes: 3