Reputation: 3283
I have a dataframe like
+------+-------------------+------+
|group | time| label|
+------+-------------------+------+
| a|2020-01-01 10:49:00|first |
| a|2020-01-01 10:51:00|second|
| a|2020-01-01 12:49:00|first |
| b|2020-01-01 12:44:00|second|
| b|2020-01-01 12:46:00|first |
| c|2020-01-01 12:46:00|third |
+------+-------------------+------+
I would like to drop all rows where, for each group, the label first
is more recent than label second
or third
. For instance in group a
the row with first
and 2020-01-01 12:49:00
should be dropped as there's an older row with second
label.
The desired output would be:
+------+-------------------+------+
|group | time| label|
+------+-------------------+------+
| a|2020-01-01 10:49:00|first |
| a|2020-01-01 10:51:00|second|
| b|2020-01-01 12:44:00|second|
| c|2020-01-01 12:46:00|third |
+------+-------------------+------+
A window function with partition by group
would filter inside each group, but how to implement the filter on label?
Upvotes: 1
Views: 258
Reputation: 42392
You can get the previous time with a label that is not "first", and do a filter using that column:
import org.apache.spark.sql.expressions.Window
val df2 = df.withColumn(
"non_first_time",
last(
when(col("label") =!= "first", col("time")),
true
).over(
Window.partitionBy("group").orderBy("time")
)
).filter("""
label != 'first' or
(label = 'first' and (non_first_time > time or non_first_time is null))
""").drop("non_first_time")
df2.show
+-----+-------------------+------+
|group| time| label|
+-----+-------------------+------+
| c|2020-01-01 12:46:00| third|
| b|2020-01-01 12:44:00|second|
| a|2020-01-01 10:49:00| first|
| a|2020-01-01 10:51:00|second|
+-----+-------------------+------+
Upvotes: 1