Reputation: 11
I want to calculate time difference between events of a session using Scala.
-- GIVEN Source is a csv file as shown below:
HEADER
"session","events","timestamp","Records"
DATA
"session_1","event_1","2015-01-01 10:10:00",100
"session_1","event_2","2015-01-01 11:00:00",500
"session_1","event_3","2015-01-01 11:30:00",300
"session_1","event_4","2015-01-01 11:45:00",300
"session_2","event_1","2015-01-01 10:10:00",100
"session_2","event_2","2015-01-01 11:00:00",500
REQUIRED OUTPUT
HEADER
"session","events","time_spent_in_minutes","total_records"
DATA
"session_1","event_1","50",100
"session_1","event_2","30",600
"session_1","event_3","15",900
"session_1","event_4","0",1200
"session_2","event_1","50",100
"session_2","event_2","0",600
Where time_spend_in_minutes is difference between current_event and next event for a given session. Header is not required in target but good to have.
I am new to Scala so here what i have so far:
$ cat test.csv
"session_1","event_1","2015-01-01 10:10:00",100
"session_1","event_2","2015-01-01 11:00:00",500
"session_1","event_3","2015-01-01 11:30:00",300
"session_1","event_4","2015-01-01 11:45:00",300
"session_2","event_1","2015-01-01 10:10:00",100
"session_2","event_2","2015-01-01 11:00:00",500
scala> val sessionFile = sc.textFile("test.csv").
map(_.split(',')).
map(e => (e(1).trim, Sessions(e(0).trim,e(1).trim,e(2).trim,e(3).trim.toInt))).
foreach(println)
("event_1",Sessions("session_2","event_1","2015-01-01 10:10:00",100))
("event_1",Sessions("session_1","event_1","2015-01-01 10:10:00",100))
("event_2",Sessions("session_2","event_2","2015-01-01 11:00:00",500))
("event_2",Sessions("session_1","event_2","2015-01-01 11:00:00",500))
("event_3",Sessions("session_1","event_3","2015-01-01 11:30:00",300))
("event_4",Sessions("session_1","event_4","2015-01-01 11:45:00",300))
sessionFile: Unit = ()
scala>
Upvotes: 0
Views: 3924
Reputation: 1138
Try Something like this:
import org.joda.time.format._
import org.joda.time._
val d1 = DateTime.parse("2015-03-03", DateTimeFormat.forPattern("yyyy-MM-dd"))
val d2 = DateTime.parse("2015-03-04", DateTimeFormat.forPattern("yyyy-MM-dd"))
d1.getMillis() - d2.getMillis()
Upvotes: 0
Reputation: 8851
Here is a solution that uses joda time library.
val input =
""""session_1","event_1","2015-01-01 10:10:00",100
"session_1","event_2","2015-01-01 11:00:00",500
"session_1","event_3","2015-01-01 11:30:00",300
"session_1","event_4","2015-01-01 11:45:00",300
"session_2","event_1","2015-01-01 10:10:00",100
"session_2","event_2","2015-01-01 11:00:00",500"""
Create RDD from text input, can be read from file using sc.textFile
import org.joda.time.format._
import org.joda.time._
def strToTime(s: String):Long = {
DateTimeFormat.forPattern(""""yyyy-MM-dd HH:mm:ss"""")
.parseDateTime(s).getMillis()/1000
}
val r1 = sc.parallelize(input.split("\n"))
.map(_.split(","))
.map(x => (x(0), (x(1), x(2), x(3))))
.groupBy(_._1)
.map(_._2.map{ case(s, (e, timestr, r)) =>
(s, (e, strToTime(timestr), r))}
.toArray
.sortBy( z => z match {
case (session, (event, time, records)) => time}))
Converted time from "2015-01-01 10:10:00" to seconds from epoch, and sorted by time.
val r2 = r1.map(x => x :+ { val y = x.last;
y match {
case (session, (event, time, records)) =>
(session, (event, time, "0")) }})
Added an extra event in each session, with all params same as last event of session except record count. This allows time-duration calculation to provide "0" in last event.
Use sliding
to get pairs of events.
val r3 = r2.map(x => x.sliding(2).toArray)
val r4 = r3.map(x => x.map{
case Array((s1, (e1, t1, c1)), (s2, (e2, t2, c2))) =>
(s1, (e1, (t2 - t1)/60, c1)) } )
Use scan
to add records-count in incremental way.
val r5 = r4.map(x => x.zip(x.map{ case (s, (e, t, r)) => r.toInt}
.scan(0)(_+_)
.drop(1)))
val r6 = r5.map(x => x.map{ case ((s, (e, t, r)), recordstillnow) =>
s"${s},${e},${t},${recordstillnow}" })
val r7 = r6.flatMap(x => x)
r7.collect.mkString("\n")
//"session_2","event_1",50,100
//"session_2","event_2",0,600
//"session_1","event_1",50,100
//"session_1","event_2",30,600
//"session_1","event_3",15,900
//"session_1","event_4",0,1200
Upvotes: 3