Reputation: 69
I want to mark rows where start and end time overlaps based on keys. For example, if given a dataframe like:
+---+-------------------+-------------------+
|key|start_date |end_date |
+---+-------------------+-------------------+
|A |2022-01-11 00:00:00|8888-12-31 00:00:00|
|B |2020-01-01 00:00:00|2022-02-10 00:00:00|
|B |2019-02-08 00:00:00|2020-02-15 00:00:00|
|B |2022-02-16 00:00:00|2022-12-15 00:00:00|
|C |2018-01-01 00:00:00|2122-02-10 00:00:00|
+---+-------------------+-------------------+
the resulting dataframe would have the first and second B records flagged, since their start and end times overlap. Like this:
+---+-------------------+-------------------+-----+
|key|start_date |end_date |valid|
+---+-------------------+-------------------+-----+
|A |2022-01-11 00:00:00|8888-12-31 00:00:00|true |
|B |2020-01-01 00:00:00|2022-02-10 00:00:00|false|
|B |2019-02-08 00:00:00|2020-02-15 00:00:00|false|
|B |2022-02-16 00:00:00|2022-12-15 00:00:00|true |
|C |2018-01-01 00:00:00|2122-02-10 00:00:00|true |
+---+-------------------+-------------------+-----+
Upvotes: 1
Views: 1125
Reputation: 26676
df = (df.select('key',*[to_date(x).alias(x) for x in df.columns if x!='key'])#Coerce to dates
.withColumn('valid', collect_list(array(col('start_date'), col('end_date'))).over(Window.partitionBy('key')))#Create list of start_end dates intervals
.withColumn('valid',expr("array_contains(transform(valid,(x,i)->start_date<(x[0])),true)"))#check if the start date occurs before end date, if true flag
)
+---+----------+----------+-----+
|key|start_date|end_date |valid|
+---+----------+----------+-----+
|A |2022-01-11|8888-12-31|false|
|B |2020-01-01|2022-02-10|true |
|B |2019-02-08|2020-02-15|true |
|B |2022-02-16|2022-12-15|false|
|C |2018-01-01|2122-02-10|false|
+---+----------+----------+-----+
Upvotes: 0
Reputation: 24366
Here I've added scripts to combine overlapping date ranges. In your case, I had modified the last script slightly - instead of final groupBy
for overlapping ranges, I have added a window function which just flags them.
Test input:
from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
[('A', '2022-01-11 00:00:00', '8888-12-31 00:00:00'),
('B', '2020-01-01 00:00:00', '2022-02-10 00:00:00'),
('B', '2019-02-08 00:00:00', '2020-02-15 00:00:00'),
('B', '2022-02-16 00:00:00', '2022-12-15 00:00:00'),
('C', '2018-01-01 00:00:00', '2122-02-10 00:00:00')],
['key', 'start_date', 'end_date'])
Script:
w1 = W.partitionBy("key").orderBy("start_date")
w2 = W.partitionBy("key", "contiguous_grp")
max_end = F.max("end_date").over(w1)
contiguous = F.when(F.datediff(F.lag(max_end).over(w1), "start_date") < 0, 1).otherwise(0)
df = (df
.withColumn("contiguous_grp", F.sum(contiguous).over(w1))
.withColumn("valid", (F.count(F.lit(1)).over(w2)) == 1)
.drop("contiguous_grp")
)
df.show()
# +---+-------------------+-------------------+-----+
# |key| start_date| end_date|valid|
# +---+-------------------+-------------------+-----+
# | A|2022-01-11 00:00:00|8888-12-31 00:00:00| true|
# | B|2019-02-08 00:00:00|2020-02-15 00:00:00|false|
# | B|2020-01-01 00:00:00|2022-02-10 00:00:00|false|
# | B|2022-02-16 00:00:00|2022-12-15 00:00:00| true|
# | C|2018-01-01 00:00:00|2122-02-10 00:00:00| true|
# +---+-------------------+-------------------+-----+
Upvotes: 1