Reputation: 399
I've a sparse dataset like this:
ip,ts,session
"123","1","s1"
"123","2",""
"123","3",""
"123","4",""
"123","10","s2"
"123","11",""
"123","12",""
"222","5","s6"
"222","6",""
"222","7",""
I need to make it dense like this:
ip,ts,session
"123","1","s1"
"123","2","s1"
"123","3","s1"
"123","4","s1"
"123","10","s2"
"123","11","s2"
"123","12","s2"
"222","5","s6"
"222","6","s6"
"222","7","s6"
I know how to do it using RDD - re-partition by ip and within partitionMap groupBy(ip).sortBy(ts).scan()(): scan function will carry over prior calculated value to the next iteration and decide to use prior value or keep current and pass new choice to next "scan" iteration
Now I'm trying to use DataFrame only, without going back to RDD. I was looking at Window functions, but all I could come up with is first value within group, which is not the same. Or I just do not understand how to create correct range.
Upvotes: 1
Views: 205
Reputation: 399
My final code reusing David Griffin idea: dataWithSessionSparse is a starting dataset described in my question
val denseSessRecordsOnly = dataWithSessionSparse
.filter(col("sessionId") !== "")
.select(col("ip").alias("r_ip"), col("sessionId").alias("r_sessionId"), col("ts").alias("r_ts")) // isolates first records for all sessions
val dataWithSessionDense = dataWithSessionSparse
.join(denseSessRecordsOnly, col("ip") === col("r_ip")) // explodes each event to relate to all sessions within ip
.filter(col("ts") >= col("r_ts")) //flters out exploded dataset to have each event to be related to sessions prior or at the time of event
.groupBy(col("ip"),col("ts")).agg(max(col("r_ts")).alias("r_ts")) //takes sessionId with max ts.
.join(
denseSessRecordsOnly.select(col("r_ip").alias("l_ip"),col("r_sessionId").alias("sessionId"),col("r_ts").alias("l_ts")),
col("r_ts") === col("l_ts") && col("ip")===col("l_ip"))
.select(col("ip"),col("ts"),col("sessionId"))
Upvotes: 0
Reputation: 13927
You can do it with multiple self-joins. Basically, you want to create a data set of all the "start session" records (filter($"session" !== "")
) and then join that against the original data set, filtering out the records where the "session start" was later than the current session (filter($"ts" >= $"r_ts")
). Then you want to find out the max($"r_ts")
for each ip
. The last join is just to retrieve the session
value from the original data set.
data.join(
data.filter($"session" !== "").select(
$"ip" as "r_ip", $"session" as "r_session", $"ts" as "r_ts"
),
$"ip" === $"r_ip"
)
.filter($"ts" >= $"r_ts")
.groupBy($"ip",$"ts")
.agg(max($"r_ts") as "r_ts")
.join(
data.select($"session",$"ts" as "l_ts"),
$"r_ts" === $"l_ts"
)
.select($"ip",$"ts",$"session")
BTW, my solution assumes that the column ts
is something like a transaction sequence -- that it is an incrementing Int value. If it's not, you can use my DataFrame-ified zipWithIndex solution to create a column that will serve the same purpose.
Upvotes: 2