Chinti
Chinti

Reputation: 303

Reducing a spark data frame based on the value of a column changed

I would want a spark scala code which reduces a dataframe size. There is a column "Status" and there is another column "Time" . I want only the rows when the status changes.

Current Dataframe

Time  Status
100    Running
200    Running
300    Stopped
400    Stopped
500    Running
600    Running 
700    Stopped 
800    Ended

Now I want a dataframe which has only rows when the status changes. So the required dataframe would be

Time  Status
100    Running
300    Stopped
500    Running
700    Stopped 
800    Ended


Upvotes: 0

Views: 241

Answers (1)

Sanket9394
Sanket9394

Reputation: 2091


val df = currentDf.withColumn("dummy_id",lit(1))

val windowSpec  = Window.partitionBy("dummyId").orderBy("stamp")

val df2 = df.withColumn("prev_status",lag("Status",1).over(windowSpec))

val finalDf = df2.filter($"prev_status".isNull  || $"Status" =!= $"prev_status")
             .drop("prev_status")

Although this will work, but this will bring WHOLE data to 1 executor and will not be scalable.

Incase your data is not much then you can go with this.

Or there is a lengthier way by using RDDs and mapPartitions where you can increase the parallelism. ref It will be complex but more performant.

Upvotes: 1

Related Questions