Reputation: 2274
I am reading in firewall log data which consists of the following:
(UniqueID, start_or_stop, timestamp) At some point each “start” is followed by stop and when it does I want to output (UniqueID, start_time,stop_time)
This means usually tracking state, when a line reader sees start then record in a dictionary, when it sees a stop it then removes the start from dictionary and emits the output.
My question is: how to track such shared state using apache spark?
It’s worth pointing out that UniqueID can be reused once a stop has been reached - it is made from sourceIP-sourcePort-destIP-destPort which can be reused.
Upvotes: 0
Views: 609
Reputation: 2328
Assuming the constraint that a row indicating stop
will always follow a row indicating start
for a given UniqueID
, consider below input (0 is start and 1 is stop event):
UniqueID,start_or_stop,timestamp
u1,0,2018-01-22 13:04:32
u2,0,2018-01-22 13:04:35
u2,1,2018-01-25 18:55:08
u3,0,2018-01-25 18:56:17
u1,1,2018-01-25 20:51:43
u2,0,2018-01-31 07:48:43
u3,1,2018-01-31 07:48:48
u1,0,2018-02-02 09:40:58
u2,1,2018-02-02 09:41:01
u1,1,2018-02-05 14:03:27
Then, we can apply below transformations to get what you want. The code is in scala, but same functions are available in python (so, i think can be easily inferred and ported):
//Define the window specification, after partition and sort, select
//the 2 rows in the window group that will contain the start/stop time
val wSpec = Window.partitionBy('UniqueID).orderBy('timestamp).rowsBetween(0, 1)
//Assume df is the DataFrame loaded with above data
df.withColumn("Last", last('timestamp) over wSpec). //add a new col having stop time
where("start_or_stop = 0"). //Just need the alternate rows
drop("start_or_stop"). //Drop column
withColumnRenamed("timestamp", "start_time"). //Rename to start
withColumnRenamed("Last", "stop_time"). //Rename to stop
show(false)
This provides below output:
+--------+---------------------+---------------------+
|UniqueID|start_time |stop_time |
+--------+---------------------+---------------------+
|u3 |2018-01-25 18:56:17.0|2018-01-31 07:48:48.0|
|u1 |2018-01-22 13:04:32.0|2018-01-25 20:51:43.0|
|u1 |2018-02-02 09:40:58.0|2018-02-05 14:03:27.0|
|u2 |2018-01-22 13:04:35.0|2018-01-25 18:55:08.0|
|u2 |2018-01-31 07:48:43.0|2018-02-02 09:41:01.0|
+--------+---------------------+---------------------+
Upvotes: 1