Reputation: 401
Let's say I have logs of requests with a user ip and a timestamp. How do I find users that made more than or equal to 5 requests within 1 second interval?
user_ip time
---------------------------
user_ip1 16:11:10.56
user_ip1 16:11:10.67
user_ip1 16:11:10.87
user_ip2 16:11:10.92
user_ip2 16:11:10.97
user_ip1 16:11:11.15
user_ip1 16:11:11.20
user_ip1 16:11:11.30
user_ip2 16:11:12.13
user_ip2 16:11:13.50
user_ip2 16:11:13.80
user_ip1 has made 3 requests in 16:11:10 and 3 in 16:11:11 that is 6 requests with 0.74 second range starting 10.56 and ending 11.30. While user_ip2 requests are spread out across multiple seconds.
The result on test dataset should be single user_ip1 row
Upvotes: 0
Views: 365
Reputation: 401
Thanks to @Vitaly I eventually came to this solution
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, TimestampType, IntegerType, StringType, FloatType
from datetime import datetime
from pyspark.sql.functions import udf
data = [("user_ip1", datetime.strptime("2020-12-12 16:11:10.56", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip1", datetime.strptime("2020-12-12 16:11:10.67", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip1", datetime.strptime("2020-12-12 16:11:10.87", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip2", datetime.strptime("2020-12-12 16:11:10.92", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip2", datetime.strptime("2020-12-12 16:11:10.97", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip1", datetime.strptime("2020-12-12 16:11:11.15", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip1", datetime.strptime("2020-12-12 16:11:11.20", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip1", datetime.strptime("2020-12-12 16:11:11.30", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip2", datetime.strptime("2020-12-12 16:11:12.13", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip2", datetime.strptime("2020-12-12 16:11:13.50", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip2", datetime.strptime("2020-12-12 16:11:13.80", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip2", datetime.strptime("2020-12-12 16:11:13.80", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip2", datetime.strptime("2020-12-12 16:11:13.80", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip2", datetime.strptime("2020-12-12 16:11:13.80", "%Y-%m-%d %H:%M:%S.%f")),
]
streaming_structure = StructType([
StructField('user_ip', StringType(), True),
StructField('ts', TimestampType(), True),
])
columns = ["user_ip", "ts"]
views_df = spark.createDataFrame(data, schema=streaming_structure)
def get_ms_diff(start, end):
if start is None or end is None:
return None
return end.timestamp() - start.timestamp()
get_ms_diff_udf = udf(get_ms_diff, FloatType())
window = Window.partitionBy("user_ip").orderBy(asc("ts"))
suspicious_ips = views_df \
.withColumn("five_ago", lag("ts", 5).over(window)) \
.withColumn("diff", get_ms_diff_udf(col("five_ago"), col("ts"))) \
.filter("diff < 1") \
.select("user_ip", "ts", "five_ago", "diff") \
.distinct()
Upvotes: 0
Reputation: 8206
A very straightforward technique: use lag(5) to obtain the row that preceds the current by 5. If such a row does not exist - null is returned. Then simply compute the time diff and filter. Partial code below:
data = [
("user_ip1", "16:11:10.56"),
("user_ip1", "16:11:10.67"),
("user_ip1", "16:11:10.87"),
("user_ip2", "16:11:10.92"),
("user_ip2","16:11:10.97"),
("user_ip1", "16:11:11.15"),
("user_ip1", "16:11:11.20"),
("user_ip1", "16:11:11.30"),
("user_ip2", "16:11:12.13"),
("user_ip2", "16:11:13.50"),
("user_ip2", "16:11:13.80")
]
columns = ["ip", "time"]
spark.createDataFrame(data).toDF(*columns).createOrReplaceTempView("data")
spark.sql("""
select ip,
time,
lag(time, 5) over (partition by ip order by time asc) five_occurences_ago
from data
""").show()
+--------+-----------+-------------------+
| ip| time|five_occurences_ago|
+--------+-----------+-------------------+
|user_ip1|16:11:10.56| null|
|user_ip1|16:11:10.67| null|
|user_ip1|16:11:10.87| null|
|user_ip1|16:11:11.15| null|
|user_ip1|16:11:11.20| null|
|user_ip1|16:11:11.30| 16:11:10.56|
|user_ip2|16:11:10.92| null|
|user_ip2|16:11:10.97| null|
|user_ip2|16:11:12.13| null|
|user_ip2|16:11:13.50| null|
|user_ip2|16:11:13.80| null|
+--------+-----------+-------------------+
Upvotes: 1
Reputation: 3232
Since windowing is done on arbitrary window start and end time, we can use Spark window
to identify the start of a new 1 second
interval.
We calculate the time difference between the previous row and current row using lag
and if this difference is more than 1 second it signifies that the current row can't be in the window containing the previous row and hence we create a new window to contain this row.
After identifying windows rows belong to, we can group then by user_ip, window
apply count
aggregation and filter to find appropriate user IPs.
from datetime import datetime
from pyspark.sql import Window
from pyspark.sql import functions as F
max_rps = 5
data = [("user_ip1", datetime.strptime("16:11:10.56", "%H:%M:%S.%f")),
("user_ip1", datetime.strptime("16:11:10.67", "%H:%M:%S.%f")),
("user_ip1", datetime.strptime("16:11:10.87", "%H:%M:%S.%f")),
("user_ip2", datetime.strptime("16:11:10.92", "%H:%M:%S.%f")),
("user_ip2", datetime.strptime("16:11:10.97", "%H:%M:%S.%f")),
("user_ip1", datetime.strptime("16:11:11.15", "%H:%M:%S.%f")),
("user_ip1", datetime.strptime("16:11:11.20", "%H:%M:%S.%f")),
("user_ip1", datetime.strptime("16:11:11.30", "%H:%M:%S.%f")),
("user_ip2", datetime.strptime("16:11:12.13", "%H:%M:%S.%f")),
("user_ip2", datetime.strptime("16:11:13.50", "%H:%M:%S.%f")),
("user_ip2", datetime.strptime("16:11:13.80", "%H:%M:%S.%f"))]
window_spec = Window.partitionBy("user_ip").orderBy("time")
df.withColumn("diff", F.col("time") - F.coalesce(F.lag("time").over(window_spec), F.col("time")))\
.withColumn("rc", F.when(F.col("diff") <= F.expr("INTERVAL 1 SECOND"), 0).otherwise(1))\
.withColumn("window", F.sum("rc").over(window_spec))\
.groupBy("user_ip", "window")\
.agg(F.count('time').alias('rps'))\
.filter(F.col("rps") > max_rps)\
.show(200, False)
+--------+------+---+
|user_ip |window|rps|
+--------+------+---+
|user_ip1|0 |6 |
+--------+------+---+
Upvotes: 1