Konrads
Konrads

Reputation: 2274

Shared state in spark?

I am reading in firewall log data which consists of the following:

(UniqueID, start_or_stop, timestamp) At some point each “start” is followed by stop and when it does I want to output (UniqueID, start_time,stop_time)

This means usually tracking state, when a line reader sees start then record in a dictionary, when it sees a stop it then removes the start from dictionary and emits the output.

My question is: how to track such shared state using apache spark?

It’s worth pointing out that UniqueID can be reused once a stop has been reached - it is made from sourceIP-sourcePort-destIP-destPort which can be reused.

Upvotes: 0

Views: 609

Answers (1)

sujit
sujit

Reputation: 2328

Assuming the constraint that a row indicating stop will always follow a row indicating start for a given UniqueID, consider below input (0 is start and 1 is stop event):

UniqueID,start_or_stop,timestamp
u1,0,2018-01-22 13:04:32
u2,0,2018-01-22 13:04:35
u2,1,2018-01-25 18:55:08
u3,0,2018-01-25 18:56:17
u1,1,2018-01-25 20:51:43
u2,0,2018-01-31 07:48:43
u3,1,2018-01-31 07:48:48
u1,0,2018-02-02 09:40:58
u2,1,2018-02-02 09:41:01
u1,1,2018-02-05 14:03:27

Then, we can apply below transformations to get what you want. The code is in scala, but same functions are available in python (so, i think can be easily inferred and ported):

//Define the window specification, after partition and sort, select 
//the 2 rows in the window group that will contain the start/stop time
val wSpec = Window.partitionBy('UniqueID).orderBy('timestamp).rowsBetween(0, 1)

//Assume df is the DataFrame loaded with above data
df.withColumn("Last", last('timestamp) over wSpec). //add a new col having stop time
    where("start_or_stop = 0").  //Just need the alternate rows
    drop("start_or_stop"). //Drop column
    withColumnRenamed("timestamp", "start_time"). //Rename to start
    withColumnRenamed("Last", "stop_time").  //Rename to stop
    show(false)

This provides below output:

+--------+---------------------+---------------------+
|UniqueID|start_time           |stop_time            |
+--------+---------------------+---------------------+
|u3      |2018-01-25 18:56:17.0|2018-01-31 07:48:48.0|
|u1      |2018-01-22 13:04:32.0|2018-01-25 20:51:43.0|
|u1      |2018-02-02 09:40:58.0|2018-02-05 14:03:27.0|
|u2      |2018-01-22 13:04:35.0|2018-01-25 18:55:08.0|
|u2      |2018-01-31 07:48:43.0|2018-02-02 09:41:01.0|
+--------+---------------------+---------------------+

Upvotes: 1

Related Questions