scoder
scoder

Reputation: 2611

spark delete previous row based on the new row with some conditions match

I have dataframe like below

type   f1   f2  value 

1      a    xy    11

2      b    ab    13

3      c    na    16

3      c    dir    18

3      c    ls    23

I have to delete a previous row some some of conditions matches with next row,

for example from the above table, when column fields of type == type(row-1) && f1 == f1(row-1) && abs(value - value (row-1)) < 2 , when this condition matches I want to delete the previous row.

so I my table should like below

type   f1   f2  value 

1      a    xy    11

2      b    ab    13

3      c    dir   18

3      c    ls    30 

I am thinking that we can make use of lag or lead features but not getting exact logic

Upvotes: 0

Views: 1101

Answers (1)

vdep
vdep

Reputation: 3590

Yes, its can be done using .lead()

import org.apache.spark.sql.expressions._
//define window specification
val windowSpec = Window.partitionBy($"type",$"f1").orderBy($"type")

val inputDF = sc.parallelize(List((1,"a","xy",11),(2,"b","ab",13),(3,"c","na",16),(3,"c","dir",18),(3,"c","ls",23))).toDF("type","f1","f2","value")

inputDF.withColumn("leadValue",lead($"value",1).over(windowSpec))
  .withColumn("result", when(abs($"leadValue" - $"value") <= 2, 1).otherwise(0)) //check for condition
  .filter($"result" === 0)      //filter the rows
  .drop("leadValue","result") //remove additional columns
  .orderBy($"type")
  .show

Output:

+----+---+---+-----+
|type| f1| f2|value|
+----+---+---+-----+
|   1|  a| xy|   11|
|   2|  b| ab|   13|
|   3|  c|dir|   18|
|   3|  c| ls|   23|
+----+---+---+-----+

Here as we already are partitioning by type & f1 we need not check for their equality condition

Upvotes: 3

Related Questions