Oleksiy
Oleksiy

Reputation: 6567

Sessionization with spark

I have a bunch of events in spark (user clicks/actions/button presses) characterized by a note column:

>>> df.show(20)
+-------+-------------+------------+------+
|   user|    timestamp|        note|action|
+-------+-------------+------------+------+
|2376466|1458580817381|event #1 ...|UPDATE|
|2376466|1458580822034|event #1 ...|UPDATE|
|2376466|1458580822112|event #2 ...|UPDATE|
|2376466|1458580822166|event #2 ...|UPDATE|
|2376466|1458580822216|event #2 ...|UPDATE|
|2376466|1458580822225|event #2 ...|UPDATE|
|2376466|1458580822651|event #1 ...|UPDATE|
|2376466|1458580822660|event #1 ...|UPDATE|
+-------+-------------+------------+------+

I'd like to know the time duration of a "session" of particular note. For example, event #2 started at 1458580822112 and ended at 1458580822225, so the duration would be ..225 - ..112 = 113 msec. Is there any spark helpers or shortcuts to organize data into ""sessions" or some other way to extract the information like this? Or the idea is to keep adding additional state information to each row and rolling it up when an session identifier column is ready?

NOTE: multiple notes of the same type should be considered to be separate sessions.

Upvotes: 0

Views: 2352

Answers (1)

charles gomes
charles gomes

Reputation: 2155

You can make use of Spark-SQL to acheive your goal. Here is some code that works for me which will give sessions. You can write a helper function and then register it as UDF. This UDF then can be called in your SQL statement.

df.registerTempTable("Events")    
import sqlContext.implicits._

# (You can modify this according to what exact value have in note column.)

def process(colname: String):String = {    
  return  colname.substring(0,8)    
}

sqlContext.udf.register("process",process _)    
val dd = sqlContext.sql("select timestamp as timestamp, process(note) as note from Events")

dd.registerTempTable("SubEvents")

val dt = sqlContext.sql("select last(timestamp) - first(timestamp) as session, note as note from SubEvents group by note")

dt.show()    
+--------+--------+    
|session|    note|    
+--------+--------+    
|       5|event #1|    
|       2|event #2|    
|       1|event #3|    
+--------+--------+

Also the complete bluemix spark notebook can be viewed here:-

Thanks,

Charles.

Upvotes: 2

Related Questions