Michael Zeltser
Michael Zeltser

Reputation: 399

how to carry over last non empty value to subsequent rows using Spark DataFrame

I've a sparse dataset like this:

  ip,ts,session
  "123","1","s1"
  "123","2",""
  "123","3",""
  "123","4",""
  "123","10","s2"
  "123","11",""
  "123","12",""
  "222","5","s6"
  "222","6",""
  "222","7",""

I need to make it dense like this:

  ip,ts,session
  "123","1","s1"
  "123","2","s1"
  "123","3","s1"
  "123","4","s1"
  "123","10","s2"
  "123","11","s2"
  "123","12","s2"
  "222","5","s6"
  "222","6","s6"
  "222","7","s6"

I know how to do it using RDD - re-partition by ip and within partitionMap groupBy(ip).sortBy(ts).scan()(): scan function will carry over prior calculated value to the next iteration and decide to use prior value or keep current and pass new choice to next "scan" iteration

Now I'm trying to use DataFrame only, without going back to RDD. I was looking at Window functions, but all I could come up with is first value within group, which is not the same. Or I just do not understand how to create correct range.

Upvotes: 1

Views: 205

Answers (2)

Michael Zeltser
Michael Zeltser

Reputation: 399

My final code reusing David Griffin idea: dataWithSessionSparse is a starting dataset described in my question

val denseSessRecordsOnly = dataWithSessionSparse
  .filter(col("sessionId") !== "")
  .select(col("ip").alias("r_ip"), col("sessionId").alias("r_sessionId"), col("ts").alias("r_ts")) // isolates first records for all sessions

val dataWithSessionDense = dataWithSessionSparse
  .join(denseSessRecordsOnly, col("ip") === col("r_ip")) // explodes each event to relate to all sessions within ip
  .filter(col("ts") >= col("r_ts")) //flters out exploded dataset to have each event to be related to sessions prior or at the time of event
  .groupBy(col("ip"),col("ts")).agg(max(col("r_ts")).alias("r_ts")) //takes sessionId with max ts.
  .join(
    denseSessRecordsOnly.select(col("r_ip").alias("l_ip"),col("r_sessionId").alias("sessionId"),col("r_ts").alias("l_ts")),
    col("r_ts") === col("l_ts") && col("ip")===col("l_ip"))
  .select(col("ip"),col("ts"),col("sessionId"))

Upvotes: 0

David Griffin
David Griffin

Reputation: 13927

You can do it with multiple self-joins. Basically, you want to create a data set of all the "start session" records (filter($"session" !== "")) and then join that against the original data set, filtering out the records where the "session start" was later than the current session (filter($"ts" >= $"r_ts")). Then you want to find out the max($"r_ts") for each ip. The last join is just to retrieve the session value from the original data set.

data.join(
  data.filter($"session" !== "").select(
    $"ip" as "r_ip", $"session" as "r_session", $"ts" as "r_ts"
  ), 
  $"ip" === $"r_ip"
)
.filter($"ts" >= $"r_ts")
.groupBy($"ip",$"ts")
.agg(max($"r_ts") as "r_ts")
.join(
  data.select($"session",$"ts" as "l_ts"), 
  $"r_ts" === $"l_ts"
)
.select($"ip",$"ts",$"session")

BTW, my solution assumes that the column ts is something like a transaction sequence -- that it is an incrementing Int value. If it's not, you can use my DataFrame-ified zipWithIndex solution to create a column that will serve the same purpose.

Upvotes: 2

Related Questions