Reputation: 53
How to find the time between events in a group?
For example, I have Streaming Source (Kafka) from which I get many columns. This stream is read into spark, preprocessed, cleaned and only these four columns are kept: "ClientTimestamp" ,"sensor_type", "activity", "User_detail".
Now, I want to calculate the total time for which the critical activity existed for each user.
Clientimestamp Sensor_type activity User_detail
4/11/2021 10:00:00 ultrasonic critical user_A
4/11/2021 10:00:00 ultrasonic normal user_B
4/11/2021 10:03:00 ultrasonic normal user_A
4/11/2021 10:05:00 ultrasonic critical user_B
4/11/2021 10:06:00 ultrasonic critical user_A
4/11/2021 10:07:00 ultrasonic critical user_A
4/11/2021 10:08:00 ultrasonic critical user_B
4/11/2021 10:09:00 ultrasonic critical user_B
so for user_A the total time between all critical activity is calculated by finding difference between two critical events and summing up such differences.
(10:00:00 - 10:06:00)+(10:06:00 - 10:07:00)
therefore for userA critical activity lasted for total minute of (5+1)= 6 minutes.
Similarly for user_B,
(10:05:00 - 10:08:00)+ (10:08:00-10:09:00)
userB critical activity lasted for total minute of (3+1) = 4 minute
For each window, i want to call a custom function that will calculate totaltime. How to apply a function on the group grouped by window?
df = df.withWatermark("clientTimestamp", "10 minutes")\
.groupby(window(df.clientTimestamp, "10 minutes", "10 minutes"), col('User_detail'), col('activity'))
.apply(calculate_time)
Upvotes: 2
Views: 399
Reputation: 18475
It looks like this could be solved by taking the difference between the maximum and minimum time for each User_detail within the Window. Also, a filter on the activity can be applied to ignore "normal" rows.
I do not see a reason why applying a custom function such as "calculate_time" is required here. Please note, I am not completely familiar with Python syntax, but your code could look like below:
df = df \
.filter(df.activity == "critical") \
.withWatermark("clientTimestamp", "10 minutes") \
.groupby(window(df.clientTimestamp, "10 minutes", "10 minutes"), col('User_detail')) \
.agg((max("clientTimestamp") - min("clientTimestamp")).alias("time_difference"))
Upvotes: 1