Adria Ciurana
Adria Ciurana

Reputation: 934

Group by date range overlapping using PySpark

I am trying to perform a kind of grouping with users who have used the same IP in a overlapping range of dates.

This will let me know if two users share the same house because they have the same IP at the same time.

Also, I've been trying to implement it, but I can't find a way to do it with PySpark SQL. In fact, I think it can't be done with PySpark, and probably requires some other graph-oriented library.

The problem is the following:

| ip          | user       | start_date | end_date   |
| ----------- | ---------- | ---------- | ---------- |
| 192.168.1.1 | a          | 2022-01-01 | 2022-01-03 |
| 192.168.1.1 | a          | 2022-01-05 | 2022-01-07 |
| 192.168.1.1 | b          | 2022-01-06 | 2022-01-09 |
| 192.168.1.1 | c          | 2022-01-08 | 2022-01-11 |
| 192.168.1.2 | d          | 2022-01-08 | 2022-01-11 |
| 192.168.1.2 | e          | 2022-01-10 | 2022-01-11 |
| 192.168.1.2 | f          | 2022-01-16 | 2022-01-18 |

As we can see:

Expected output:

| ip          | users       | date_ranges
| ----------- | ----------- | ------------------- | ------------------- |
| 192.168.1.1 | {a, b, c}   | {2022-01-01 - 2022-01-03, 2022-01-05 - 2022-01-07, 2022-01-06 - 2022-01-09, 2022-01-08 - 2022-01-11} |
| 192.168.1.2 | {d, e}      | {2022-01-08 - 2022-01-11, 2022-01-10-2022-01-11} |
| 192.168.1.1 | {f}         | {2022-01-16 - 2022-01-18} |

Do you have any ideas on how to implement this?

I thought about using GraphFrames, but I don't even know where to start :S

Upvotes: 1

Views: 1393

Answers (1)

blackbishop
blackbishop

Reputation: 32650

One way to identify overlapping date intervals it to use a cumulative conditional sum over a window partitioned by ip and ordered by start_date. For each row in a frame, if the start_date is greater than max(end_date) before the current row then it doesn't overlaps (i.e. it's a new group):

from pyspark.sql import functions as F, Window

w = Window.partitionBy('ip').orderBy('start_date')

df1 = df.withColumn(
    "previous_end", F.max("end_date").over(w)
).withColumn(
    "group",
    F.sum(F.when(F.lag("previous_end").over(w) < F.col("start_date"), 1).otherwise(0)).over(w)
).groupBy("ip", "group").agg(
    F.collect_list(
        F.struct("user", F.struct("start_date", "end_date").alias("date_ranges"))
    ).alias("sessions")
).select(
    "ip", "sessions.user", "sessions.date_ranges"
)

df1.show(truncate=False)
#+-----------+---------+------------------------------------------------------------------------------+
#|ip         |user     |date_ranges                                                                   |
#+-----------+---------+------------------------------------------------------------------------------+
#|192.168.1.1|[a]      |[{2022-01-01, 2022-01-03}]                                                    |
#|192.168.1.1|[a, b, c]|[{2022-01-05, 2022-01-07}, {2022-01-06, 2022-01-09}, {2022-01-08, 2022-01-11}]|
#|192.168.1.2|[d, e]   |[{2022-01-08, 2022-01-11}, {2022-01-10, 2022-01-11}]                          |
#|192.168.1.2|[f]      |[{2022-01-16, 2022-01-18}]                                                    |
#+-----------+---------+------------------------------------------------------------------------------+

Upvotes: 2

Related Questions