Reputation: 646
What I'm trying to accomplish is to calculate the total time a ship spends at anchor. The data I'm dealing with is time-series in nature. Throughout a ships journey from Point A -> Point B it can stop and start multiple times.
Basically, for each id (ship unique id) I want to calculate the total time spent at anchor (status === "ANCHORED"). For each "anchor" time period take the last time stamp and subtract it from the first time stamp (or vice-versa I'll just take the absolute value). I can do this easily if a ship only stops once in its journey (window function). But, I'm having trouble when it stops and starts multiple times throughout a journey. Can a window function handle this?
Here is an example of the data I'm dealing with and expected output:
val df = Seq(
(123, "UNDERWAY", 0),
(123, "ANCHORED", 12), // first anchored (first time around)
(123, "ANCHORED", 20), //take this timestamp and sub from previous
(123, "UNDERWAY", 32),
(123, "UNDERWAY", 44),
(123, "ANCHORED", 50), // first anchored (second time around)
(123, "ANCHORED", 65),
(123, "ANCHORED", 70), //take this timestamp and sub from previous
(123, "ARRIVED", 79)
).toDF("id", "status", "time")
+---+--------+----+
|id |status |time|
+---+--------+----+
|123|UNDERWAY|0 |
|123|ANCHORED|12 |
|123|ANCHORED|20 |
|123|UNDERWAY|32 |
|123|UNDERWAY|44 |
|123|ANCHORED|50 |
|123|ANCHORED|65 |
|123|ANCHORED|70 |
|123|ARRIVED |79 |
+---+--------+----+
// the resulting output I need is as follows (aggregation of total time spent at anchor)
// the ship spent 8 hours at anchor the first time, and then spent
// 20 hours at anchor the second time. So total time is 28 hours
+---+-----------------+
|id |timeSpentAtAnchor|
+---+-----------------+
|123|28 |
+---+-----------------+
Each "segment" the ship is at anchor I want to calculate the time spent at anchor and then add all those segments up to get the total time spent at anchor.
Upvotes: 0
Views: 75
Reputation: 2495
I'm new to Window
functions, so it possibly could be done better, but here is what I came up with:
This solution only looks at "this - previous", as opposed to the "last - first" within each "group" of statuses. The net effect should be the same though, since it sums them all together anyway.
import org.apache.spark.sql.expressions.Window
val w = Window.orderBy($"time")
df.withColumn("tdiff", when($"status" === lag($"status", 1).over(w), $"time" - lag($"time", 1).over(w)))
.where($"status" === lit("ANCHORED"))
.groupBy("id", "status")
.agg(sum("tdiff").as("timeSpentAtAnchor"))
.select("id", "timeSpentAtAnchor")
.show(false)
Which gives:
+---+-----------------+
|id |timeSpentAtAnchor|
+---+-----------------+
|123|28 |
+---+-----------------+
The answer was formed with information from this answer. And, as stated there:
Note: since this example doesn't use any partition, it could have performance problem, in your real data, it would be helpful if your problem can be partitioned by some variables.
Upvotes: 2