fletchr
fletchr

Reputation: 646

Apache Spark (Scala) Aggregation across time with various groups

What I'm trying to accomplish is to calculate the total time a ship spends at anchor. The data I'm dealing with is time-series in nature. Throughout a ships journey from Point A -> Point B it can stop and start multiple times.

Basically, for each id (ship unique id) I want to calculate the total time spent at anchor (status === "ANCHORED"). For each "anchor" time period take the last time stamp and subtract it from the first time stamp (or vice-versa I'll just take the absolute value). I can do this easily if a ship only stops once in its journey (window function). But, I'm having trouble when it stops and starts multiple times throughout a journey. Can a window function handle this?

Here is an example of the data I'm dealing with and expected output:

    val df = Seq(
        (123, "UNDERWAY", 0), 
        (123, "ANCHORED", 12), // first anchored (first time around)
        (123, "ANCHORED", 20), //take this timestamp and sub from previous
        (123, "UNDERWAY", 32), 
        (123, "UNDERWAY", 44), 
        (123, "ANCHORED", 50), // first anchored (second time around)
        (123, "ANCHORED", 65), 
        (123, "ANCHORED", 70), //take this timestamp and sub from previous
        (123, "ARRIVED", 79)
        ).toDF("id", "status", "time")

+---+--------+----+
|id |status  |time|
+---+--------+----+
|123|UNDERWAY|0   |
|123|ANCHORED|12  |
|123|ANCHORED|20  |
|123|UNDERWAY|32  |
|123|UNDERWAY|44  |
|123|ANCHORED|50  |
|123|ANCHORED|65  |
|123|ANCHORED|70  |
|123|ARRIVED |79  |
+---+--------+----+

// the resulting output I need is as follows (aggregation of total time spent at anchor)
// the ship spent 8 hours at anchor the first time, and then spent 
// 20 hours at anchor the second time. So total time is 28 hours
+---+-----------------+
|id |timeSpentAtAnchor|
+---+-----------------+
|123|28               |
+---+-----------------+

Each "segment" the ship is at anchor I want to calculate the time spent at anchor and then add all those segments up to get the total time spent at anchor.

Upvotes: 0

Views: 75

Answers (1)

Travis Hegner
Travis Hegner

Reputation: 2495

I'm new to Window functions, so it possibly could be done better, but here is what I came up with:

This solution only looks at "this - previous", as opposed to the "last - first" within each "group" of statuses. The net effect should be the same though, since it sums them all together anyway.

import org.apache.spark.sql.expressions.Window

val w = Window.orderBy($"time")

df.withColumn("tdiff", when($"status" === lag($"status", 1).over(w), $"time" - lag($"time", 1).over(w)))
  .where($"status" === lit("ANCHORED"))
  .groupBy("id", "status")
  .agg(sum("tdiff").as("timeSpentAtAnchor"))
  .select("id", "timeSpentAtAnchor")
  .show(false)

Which gives:

+---+-----------------+
|id |timeSpentAtAnchor|
+---+-----------------+
|123|28               |
+---+-----------------+

The answer was formed with information from this answer. And, as stated there:

Note: since this example doesn't use any partition, it could have performance problem, in your real data, it would be helpful if your problem can be partitioned by some variables.

Upvotes: 2

Related Questions