BabaGee
BabaGee

Reputation: 11

How to calculate difference of time between two records using Scala?

I want to calculate time difference between events of a session using Scala.

-- GIVEN Source is a csv file as shown below:

HEADER 
"session","events","timestamp","Records"
DATA
"session_1","event_1","2015-01-01 10:10:00",100
"session_1","event_2","2015-01-01 11:00:00",500
"session_1","event_3","2015-01-01 11:30:00",300
"session_1","event_4","2015-01-01 11:45:00",300
"session_2","event_1","2015-01-01 10:10:00",100
"session_2","event_2","2015-01-01 11:00:00",500

REQUIRED OUTPUT

HEADER 
"session","events","time_spent_in_minutes","total_records"
DATA
"session_1","event_1","50",100
"session_1","event_2","30",600
"session_1","event_3","15",900
"session_1","event_4","0",1200
"session_2","event_1","50",100
"session_2","event_2","0",600

Where time_spend_in_minutes is difference between current_event and next event for a given session. Header is not required in target but good to have.

I am new to Scala so here what i have so far:

$ cat test.csv
"session_1","event_1","2015-01-01 10:10:00",100
"session_1","event_2","2015-01-01 11:00:00",500
"session_1","event_3","2015-01-01 11:30:00",300
"session_1","event_4","2015-01-01 11:45:00",300
"session_2","event_1","2015-01-01 10:10:00",100
"session_2","event_2","2015-01-01 11:00:00",500


scala> val sessionFile = sc.textFile("test.csv").
map(_.split(',')).
map(e => (e(1).trim, Sessions(e(0).trim,e(1).trim,e(2).trim,e(3).trim.toInt))).
foreach(println)

("event_1",Sessions("session_2","event_1","2015-01-01 10:10:00",100))
("event_1",Sessions("session_1","event_1","2015-01-01 10:10:00",100))
("event_2",Sessions("session_2","event_2","2015-01-01 11:00:00",500))
("event_2",Sessions("session_1","event_2","2015-01-01 11:00:00",500))
("event_3",Sessions("session_1","event_3","2015-01-01 11:30:00",300))
("event_4",Sessions("session_1","event_4","2015-01-01 11:45:00",300))
sessionFile: Unit = ()

scala>

Upvotes: 0

Views: 3924

Answers (2)

ben jarman
ben jarman

Reputation: 1138

Try Something like this:

import org.joda.time.format._
import org.joda.time._
val d1 = DateTime.parse("2015-03-03", DateTimeFormat.forPattern("yyyy-MM-dd"))
val d2 = DateTime.parse("2015-03-04", DateTimeFormat.forPattern("yyyy-MM-dd"))
d1.getMillis() - d2.getMillis()

Upvotes: 0

Shyamendra Solanki
Shyamendra Solanki

Reputation: 8851

Here is a solution that uses joda time library.

val input = 
""""session_1","event_1","2015-01-01 10:10:00",100
   "session_1","event_2","2015-01-01 11:00:00",500
   "session_1","event_3","2015-01-01 11:30:00",300
   "session_1","event_4","2015-01-01 11:45:00",300
   "session_2","event_1","2015-01-01 10:10:00",100
   "session_2","event_2","2015-01-01 11:00:00",500"""

Create RDD from text input, can be read from file using sc.textFile

import org.joda.time.format._
import org.joda.time._

def strToTime(s: String):Long = { 
    DateTimeFormat.forPattern(""""yyyy-MM-dd HH:mm:ss"""")
                  .parseDateTime(s).getMillis()/1000 
}

val r1 = sc.parallelize(input.split("\n"))
           .map(_.split(","))
           .map(x => (x(0), (x(1), x(2), x(3))))
           .groupBy(_._1)
           .map(_._2.map{ case(s, (e, timestr, r)) => 
                              (s, (e, strToTime(timestr), r))}
                    .toArray
                    .sortBy( z => z match { 
                        case (session, (event, time, records)) => time}))

Converted time from "2015-01-01 10:10:00" to seconds from epoch, and sorted by time.

val r2 = r1.map(x => x :+ { val y = x.last; 
                            y match { 
                            case (session, (event, time, records)) => 
                                 (session, (event, time, "0")) }})

Added an extra event in each session, with all params same as last event of session except record count. This allows time-duration calculation to provide "0" in last event.

Use sliding to get pairs of events.

val r3 = r2.map(x => x.sliding(2).toArray)

val r4 = r3.map(x => x.map{ 
        case Array((s1, (e1, t1, c1)), (s2, (e2, t2, c2)))  => 
                   (s1, (e1, (t2 - t1)/60, c1)) } )

Use scan to add records-count in incremental way.

val r5 = r4.map(x => x.zip(x.map{ case (s, (e, t, r)) => r.toInt}
                            .scan(0)(_+_)
                            .drop(1)))

val r6 = r5.map(x => x.map{ case ((s, (e, t, r)), recordstillnow) =>
                             s"${s},${e},${t},${recordstillnow}" })

val r7 = r6.flatMap(x => x)

r7.collect.mkString("\n")
//"session_2","event_1",50,100
//"session_2","event_2",0,600
//"session_1","event_1",50,100
//"session_1","event_2",30,600
//"session_1","event_3",15,900
//"session_1","event_4",0,1200

Upvotes: 3

Related Questions