hassanami
hassanami

Reputation: 69

How to mark overlapping time range in PySpark dataframe?

I want to mark rows where start and end time overlaps based on keys. For example, if given a dataframe like:

+---+-------------------+-------------------+
|key|start_date         |end_date           |
+---+-------------------+-------------------+
|A  |2022-01-11 00:00:00|8888-12-31 00:00:00|
|B  |2020-01-01 00:00:00|2022-02-10 00:00:00|
|B  |2019-02-08 00:00:00|2020-02-15 00:00:00|
|B  |2022-02-16 00:00:00|2022-12-15 00:00:00|
|C  |2018-01-01 00:00:00|2122-02-10 00:00:00|
+---+-------------------+-------------------+

the resulting dataframe would have the first and second B records flagged, since their start and end times overlap. Like this:

+---+-------------------+-------------------+-----+
|key|start_date         |end_date           |valid|
+---+-------------------+-------------------+-----+
|A  |2022-01-11 00:00:00|8888-12-31 00:00:00|true |
|B  |2020-01-01 00:00:00|2022-02-10 00:00:00|false|
|B  |2019-02-08 00:00:00|2020-02-15 00:00:00|false|
|B  |2022-02-16 00:00:00|2022-12-15 00:00:00|true |
|C  |2018-01-01 00:00:00|2122-02-10 00:00:00|true |
+---+-------------------+-------------------+-----+

Upvotes: 1

Views: 1125

Answers (2)

wwnde
wwnde

Reputation: 26676

df = (df.select('key',*[to_date(x).alias(x) for x in df.columns if x!='key'])#Coerce to dates
      .withColumn('valid', collect_list(array(col('start_date'), col('end_date'))).over(Window.partitionBy('key')))#Create list of start_end dates intervals
      .withColumn('valid',expr("array_contains(transform(valid,(x,i)->start_date<(x[0])),true)"))#check if the start date occurs before end date, if true flag
     )

+---+----------+----------+-----+
|key|start_date|end_date  |valid|
+---+----------+----------+-----+
|A  |2022-01-11|8888-12-31|false|
|B  |2020-01-01|2022-02-10|true |
|B  |2019-02-08|2020-02-15|true |
|B  |2022-02-16|2022-12-15|false|
|C  |2018-01-01|2122-02-10|false|
+---+----------+----------+-----+

Upvotes: 0

ZygD
ZygD

Reputation: 24366

Here I've added scripts to combine overlapping date ranges. In your case, I had modified the last script slightly - instead of final groupBy for overlapping ranges, I have added a window function which just flags them.

Test input:

from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
    [('A', '2022-01-11 00:00:00', '8888-12-31 00:00:00'),
     ('B', '2020-01-01 00:00:00', '2022-02-10 00:00:00'),
     ('B', '2019-02-08 00:00:00', '2020-02-15 00:00:00'),
     ('B', '2022-02-16 00:00:00', '2022-12-15 00:00:00'),
     ('C', '2018-01-01 00:00:00', '2122-02-10 00:00:00')],
    ['key', 'start_date', 'end_date'])

Script:

w1 = W.partitionBy("key").orderBy("start_date")
w2 = W.partitionBy("key", "contiguous_grp")
max_end = F.max("end_date").over(w1)
contiguous = F.when(F.datediff(F.lag(max_end).over(w1), "start_date") < 0, 1).otherwise(0)
df = (df
    .withColumn("contiguous_grp", F.sum(contiguous).over(w1))
    .withColumn("valid", (F.count(F.lit(1)).over(w2)) == 1)
    .drop("contiguous_grp")
)
df.show()
# +---+-------------------+-------------------+-----+
# |key|         start_date|           end_date|valid|
# +---+-------------------+-------------------+-----+
# |  A|2022-01-11 00:00:00|8888-12-31 00:00:00| true|
# |  B|2019-02-08 00:00:00|2020-02-15 00:00:00|false|
# |  B|2020-01-01 00:00:00|2022-02-10 00:00:00|false|
# |  B|2022-02-16 00:00:00|2022-12-15 00:00:00| true|
# |  C|2018-01-01 00:00:00|2122-02-10 00:00:00| true|
# +---+-------------------+-------------------+-----+

Upvotes: 1

Related Questions