Reputation: 934
I am trying to perform a kind of grouping with users who have used the same IP in a overlapping range of dates.
This will let me know if two users share the same house because they have the same IP at the same time.
Also, I've been trying to implement it, but I can't find a way to do it with PySpark SQL. In fact, I think it can't be done with PySpark, and probably requires some other graph-oriented library.
The problem is the following:
| ip | user | start_date | end_date |
| ----------- | ---------- | ---------- | ---------- |
| 192.168.1.1 | a | 2022-01-01 | 2022-01-03 |
| 192.168.1.1 | a | 2022-01-05 | 2022-01-07 |
| 192.168.1.1 | b | 2022-01-06 | 2022-01-09 |
| 192.168.1.1 | c | 2022-01-08 | 2022-01-11 |
| 192.168.1.2 | d | 2022-01-08 | 2022-01-11 |
| 192.168.1.2 | e | 2022-01-10 | 2022-01-11 |
| 192.168.1.2 | f | 2022-01-16 | 2022-01-18 |
As we can see:
a
, b
overlaps in range and same ip
.b
, c
overlaps in range and same ip
.a
and c
are in the same group.d
, e
overlaps in range and same ip
.f
not overlap with respect other user.Expected output:
| ip | users | date_ranges
| ----------- | ----------- | ------------------- | ------------------- |
| 192.168.1.1 | {a, b, c} | {2022-01-01 - 2022-01-03, 2022-01-05 - 2022-01-07, 2022-01-06 - 2022-01-09, 2022-01-08 - 2022-01-11} |
| 192.168.1.2 | {d, e} | {2022-01-08 - 2022-01-11, 2022-01-10-2022-01-11} |
| 192.168.1.1 | {f} | {2022-01-16 - 2022-01-18} |
Do you have any ideas on how to implement this?
I thought about using GraphFrames, but I don't even know where to start :S
Upvotes: 1
Views: 1393
Reputation: 32650
One way to identify overlapping date intervals it to use a cumulative conditional sum over a window partitioned by ip
and ordered by start_date
. For each row in a frame, if the start_date
is greater than max(end_date)
before the current row then it doesn't overlaps (i.e. it's a new group):
from pyspark.sql import functions as F, Window
w = Window.partitionBy('ip').orderBy('start_date')
df1 = df.withColumn(
"previous_end", F.max("end_date").over(w)
).withColumn(
"group",
F.sum(F.when(F.lag("previous_end").over(w) < F.col("start_date"), 1).otherwise(0)).over(w)
).groupBy("ip", "group").agg(
F.collect_list(
F.struct("user", F.struct("start_date", "end_date").alias("date_ranges"))
).alias("sessions")
).select(
"ip", "sessions.user", "sessions.date_ranges"
)
df1.show(truncate=False)
#+-----------+---------+------------------------------------------------------------------------------+
#|ip |user |date_ranges |
#+-----------+---------+------------------------------------------------------------------------------+
#|192.168.1.1|[a] |[{2022-01-01, 2022-01-03}] |
#|192.168.1.1|[a, b, c]|[{2022-01-05, 2022-01-07}, {2022-01-06, 2022-01-09}, {2022-01-08, 2022-01-11}]|
#|192.168.1.2|[d, e] |[{2022-01-08, 2022-01-11}, {2022-01-10, 2022-01-11}] |
#|192.168.1.2|[f] |[{2022-01-16, 2022-01-18}] |
#+-----------+---------+------------------------------------------------------------------------------+
Upvotes: 2