Voilin
Voilin

Reputation: 401

How to find and count rows with datetime that happend within 1 second?

Let's say I have logs of requests with a user ip and a timestamp. How do I find users that made more than or equal to 5 requests within 1 second interval?

user_ip     time
---------------------------
user_ip1    16:11:10.56
user_ip1    16:11:10.67
user_ip1    16:11:10.87
user_ip2    16:11:10.92
user_ip2    16:11:10.97
user_ip1    16:11:11.15
user_ip1    16:11:11.20
user_ip1    16:11:11.30
user_ip2    16:11:12.13
user_ip2    16:11:13.50
user_ip2    16:11:13.80

user_ip1 has made 3 requests in 16:11:10 and 3 in 16:11:11 that is 6 requests with 0.74 second range starting 10.56 and ending 11.30. While user_ip2 requests are spread out across multiple seconds.

The result on test dataset should be single user_ip1 row

Upvotes: 0

Views: 365

Answers (3)

Voilin
Voilin

Reputation: 401

Thanks to @Vitaly I eventually came to this solution

from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, TimestampType, IntegerType, StringType, FloatType
from datetime import datetime
from pyspark.sql.functions import udf

data = [("user_ip1", datetime.strptime("2020-12-12 16:11:10.56", "%Y-%m-%d %H:%M:%S.%f")),
 ("user_ip1", datetime.strptime("2020-12-12 16:11:10.67", "%Y-%m-%d %H:%M:%S.%f")),
 ("user_ip1", datetime.strptime("2020-12-12 16:11:10.87", "%Y-%m-%d %H:%M:%S.%f")),
 ("user_ip2", datetime.strptime("2020-12-12 16:11:10.92", "%Y-%m-%d %H:%M:%S.%f")),
 ("user_ip2", datetime.strptime("2020-12-12 16:11:10.97", "%Y-%m-%d %H:%M:%S.%f")),
 ("user_ip1", datetime.strptime("2020-12-12 16:11:11.15", "%Y-%m-%d %H:%M:%S.%f")),
 ("user_ip1", datetime.strptime("2020-12-12 16:11:11.20", "%Y-%m-%d %H:%M:%S.%f")),
 ("user_ip1", datetime.strptime("2020-12-12 16:11:11.30", "%Y-%m-%d %H:%M:%S.%f")),
 ("user_ip2", datetime.strptime("2020-12-12 16:11:12.13", "%Y-%m-%d %H:%M:%S.%f")),
 ("user_ip2", datetime.strptime("2020-12-12 16:11:13.50", "%Y-%m-%d %H:%M:%S.%f")),
 ("user_ip2", datetime.strptime("2020-12-12 16:11:13.80", "%Y-%m-%d %H:%M:%S.%f")),
 ("user_ip2", datetime.strptime("2020-12-12 16:11:13.80", "%Y-%m-%d %H:%M:%S.%f")),
 ("user_ip2", datetime.strptime("2020-12-12 16:11:13.80", "%Y-%m-%d %H:%M:%S.%f")),
 ("user_ip2", datetime.strptime("2020-12-12 16:11:13.80", "%Y-%m-%d %H:%M:%S.%f")),
]

streaming_structure = StructType([
    StructField('user_ip', StringType(), True),
    StructField('ts', TimestampType(), True),
])


columns = ["user_ip", "ts"]

views_df = spark.createDataFrame(data, schema=streaming_structure)


def get_ms_diff(start, end):
    if start is None or end is None:
        return None
    
    return end.timestamp() - start.timestamp()

get_ms_diff_udf = udf(get_ms_diff, FloatType())

window = Window.partitionBy("user_ip").orderBy(asc("ts"))
suspicious_ips = views_df \
    .withColumn("five_ago", lag("ts", 5).over(window)) \
    .withColumn("diff", get_ms_diff_udf(col("five_ago"), col("ts"))) \
    .filter("diff < 1") \
    .select("user_ip", "ts", "five_ago", "diff") \
    .distinct()

Upvotes: 0

Vitaliy
Vitaliy

Reputation: 8206

A very straightforward technique: use lag(5) to obtain the row that preceds the current by 5. If such a row does not exist - null is returned. Then simply compute the time diff and filter. Partial code below:

data = [
    ("user_ip1", "16:11:10.56"),
    ("user_ip1", "16:11:10.67"),
    ("user_ip1", "16:11:10.87"),
    ("user_ip2", "16:11:10.92"),
    ("user_ip2","16:11:10.97"),
    ("user_ip1", "16:11:11.15"),
    ("user_ip1", "16:11:11.20"),
    ("user_ip1", "16:11:11.30"),
    ("user_ip2", "16:11:12.13"),
    ("user_ip2", "16:11:13.50"),
    ("user_ip2", "16:11:13.80")
    ]


columns = ["ip", "time"]

spark.createDataFrame(data).toDF(*columns).createOrReplaceTempView("data")

spark.sql("""
  select ip,
        time,
        lag(time, 5) over (partition by ip order by time asc) five_occurences_ago
  from data
""").show()

+--------+-----------+-------------------+
|      ip|       time|five_occurences_ago|
+--------+-----------+-------------------+
|user_ip1|16:11:10.56|               null|
|user_ip1|16:11:10.67|               null|
|user_ip1|16:11:10.87|               null|
|user_ip1|16:11:11.15|               null|
|user_ip1|16:11:11.20|               null|
|user_ip1|16:11:11.30|        16:11:10.56|
|user_ip2|16:11:10.92|               null|
|user_ip2|16:11:10.97|               null|
|user_ip2|16:11:12.13|               null|
|user_ip2|16:11:13.50|               null|
|user_ip2|16:11:13.80|               null|
+--------+-----------+-------------------+

Upvotes: 1

Nithish
Nithish

Reputation: 3232

Since windowing is done on arbitrary window start and end time, we can use Spark window to identify the start of a new 1 second interval.

We calculate the time difference between the previous row and current row using lag and if this difference is more than 1 second it signifies that the current row can't be in the window containing the previous row and hence we create a new window to contain this row.

After identifying windows rows belong to, we can group then by user_ip, window apply count aggregation and filter to find appropriate user IPs.

from datetime import datetime
from pyspark.sql import Window
from pyspark.sql import functions as F

max_rps = 5

data = [("user_ip1", datetime.strptime("16:11:10.56", "%H:%M:%S.%f")),
 ("user_ip1", datetime.strptime("16:11:10.67", "%H:%M:%S.%f")),
 ("user_ip1", datetime.strptime("16:11:10.87", "%H:%M:%S.%f")),
 ("user_ip2", datetime.strptime("16:11:10.92", "%H:%M:%S.%f")),
 ("user_ip2", datetime.strptime("16:11:10.97", "%H:%M:%S.%f")),
 ("user_ip1", datetime.strptime("16:11:11.15", "%H:%M:%S.%f")),
 ("user_ip1", datetime.strptime("16:11:11.20", "%H:%M:%S.%f")),
 ("user_ip1", datetime.strptime("16:11:11.30", "%H:%M:%S.%f")),
 ("user_ip2", datetime.strptime("16:11:12.13", "%H:%M:%S.%f")),
 ("user_ip2", datetime.strptime("16:11:13.50", "%H:%M:%S.%f")),
 ("user_ip2", datetime.strptime("16:11:13.80", "%H:%M:%S.%f"))]

window_spec = Window.partitionBy("user_ip").orderBy("time")

df.withColumn("diff", F.col("time") - F.coalesce(F.lag("time").over(window_spec), F.col("time")))\
.withColumn("rc", F.when(F.col("diff") <= F.expr("INTERVAL 1 SECOND"), 0).otherwise(1))\
.withColumn("window", F.sum("rc").over(window_spec))\
.groupBy("user_ip", "window")\
.agg(F.count('time').alias('rps'))\
.filter(F.col("rps") > max_rps)\
.show(200, False)

Output

+--------+------+---+
|user_ip |window|rps|
+--------+------+---+
|user_ip1|0     |6  |
+--------+------+---+

Upvotes: 1

Related Questions