Reputation: 1030
Below is the dataframe
val df = Seq(
("Alice", 1,"2016-05-01"),
("Alice",1 ,"2016-05-03"),
("Alice", 2,"2016-05-04"),
("Bob", 3,"2016-05-01")
).toDF("name","value" ,"date")
if leading row's df("value")
is same (partition by name
) I want to say "Nochange" otherwise subtract 1 day from date column
val windowSpec = Window.partitionBy("name").orderBy("date")
df.withColumn("result",
when(lead(df("value"), 1).over(windowSpec) === df("value") , "NOCHANGE" )
.otherwise(date_sub(df("date"),1))
).show()
This statement's output is
+-----+-----+----------+----------+
| name|value| date| result|
+-----+-----+----------+----------+
|Alice| 1|2016-05-01| NOCHANGE|
|Alice| 1|2016-05-03|2016-05-02|
|Alice| 2|2016-05-04|2016-05-03|
| Bob| 3|2016-05-01|2016-04-30|
+-----+-----+----------+---------
But expected output is
+-----+-----+----------+----------+
| name|value| date| result|
+-----+-----+----------+----------+
|Alice| 1|2016-05-01| NOCHANGE|
|Alice| 1|2016-05-03|2016-05-02|
|Alice| 2|2016-05-04| NOCHANGE| //as it is last value of Alice partition
| Bob| 3|2016-05-01| NOCHANGE|//as no leading value in Bob partition
+-----+-----+----------+----------+
Am I doing anything wrong here ?
Additional if I have multiple columns to compare (value1 , value2, value3) what would be best approach to compare consecutive rows ?
Upvotes: 1
Views: 2885
Reputation: 13927
It's because lead(..., 1)
returns null
when it is the last row in the partition, and you are not handling them correctly. See this:
df.withColumn("result" , lead(col("value"), 1).over(windowSpec)).show
+-----+-----+----------+------+
| name|value| date|result|
+-----+-----+----------+------+
|Alice| 1|2016-05-01| 1|
|Alice| 1|2016-05-03| 2|
|Alice| 2|2016-05-04| null|
| Bob| 3|2016-05-01| null|
+-----+-----+----------+------+
Try this instead:
df.withColumn("result" , lead(col("value"), 1).over(windowSpec))
.withColumn("result",
when(col("result") === col("value") || col("result").isNull, "NOCHANGE")
.otherwise(date_sub(col("date"), 1))
).show
+-----+-----+----------+----------+
| name|value| date| result|
+-----+-----+----------+----------+
|Alice| 1|2016-05-01| NOCHANGE|
|Alice| 1|2016-05-03|2016-05-02|
|Alice| 2|2016-05-04| NOCHANGE|
| Bob| 3|2016-05-01| NOCHANGE|
+-----+-----+----------+----------+
If you have multiple columns to compare, you are going to need multiple result
columns, and then use &&
to create your final result. Maybe something like:
val df2 = ....toDF("name", "value1", "value2", "date")
df.withColumn("nextValue1", lead(col("value1"), 1).over(windowSpec))
.withColumn("nextValue2", lead(col("value2"), 1).over(windowSpec))
.withColumn("result",
when(
(col("nextValue1") === col("value1") && col("nextValue2") === col("value2")) || col("nextValue1").isNull,
"NOCHANGE"
).otherwise(date_sub(col("date"), 1))
).drop("nextValue1").drop("nextValue2")
Upvotes: 2