Galuoises
Galuoises

Reputation: 3283

Spark: filter in each group

I have a dataframe like

+------+-------------------+------+
|group |               time| label|
+------+-------------------+------+
|     a|2020-01-01 10:49:00|first |
|     a|2020-01-01 10:51:00|second|
|     a|2020-01-01 12:49:00|first |
|     b|2020-01-01 12:44:00|second|
|     b|2020-01-01 12:46:00|first |
|     c|2020-01-01 12:46:00|third |
+------+-------------------+------+

I would like to drop all rows where, for each group, the label first is more recent than label second or third. For instance in group a the row with first and 2020-01-01 12:49:00 should be dropped as there's an older row with second label.

The desired output would be:

+------+-------------------+------+
|group |               time| label|
+------+-------------------+------+
|     a|2020-01-01 10:49:00|first |
|     a|2020-01-01 10:51:00|second|
|     b|2020-01-01 12:44:00|second|
|     c|2020-01-01 12:46:00|third |
+------+-------------------+------+

A window function with partition by group would filter inside each group, but how to implement the filter on label?

Upvotes: 1

Views: 258

Answers (1)

mck
mck

Reputation: 42392

You can get the previous time with a label that is not "first", and do a filter using that column:

import org.apache.spark.sql.expressions.Window

val df2 = df.withColumn(
    "non_first_time", 
    last(
        when(col("label") =!= "first", col("time")), 
        true
    ).over(
        Window.partitionBy("group").orderBy("time")
    )
).filter("""
    label != 'first' or 
    (label = 'first' and (non_first_time > time or non_first_time is null))
""").drop("non_first_time")

df2.show
+-----+-------------------+------+
|group|               time| label|
+-----+-------------------+------+
|    c|2020-01-01 12:46:00| third|
|    b|2020-01-01 12:44:00|second|
|    a|2020-01-01 10:49:00| first|
|    a|2020-01-01 10:51:00|second|
+-----+-------------------+------+

Upvotes: 1

Related Questions