Newbie
Newbie

Reputation: 741

SPARK: How to get day difference between a data frame column and timestamp in SCALA

How do I get date difference (no of days in between) in data frame scala ?

I have a df : [id: string, itemName: string, eventTimeStamp: timestamp] and a startTime (timestamp string) how do I get a column "Daydifference" - day between (startTime - timeStamp)

My Code :

Initial df :

+------------+-----------+-------------------------+
|   id       |  itemName |    eventTimeStamp       |
---------------------------------------------------- 
|   1        |  TV       |    2016-09-19T00:00:00Z |
|   1        |  Movie    |    2016-09-19T00:00:00Z |
|   1        |  TV       |    2016-09-26T00:00:00Z | 
|   2        |  TV       |    2016-09-18T00:00:00Z |

I need to get most recent eventTimeStamp based on id and itemName, so I did:

   val result = df.groupBy("id", "itemName").agg(max("eventTimeStamp") as "mostRecent")


   +------------+-----------+-------------------------+
    |   id       |  itemName |    mostRecent           |
    ---------------------------------------------------- 
    |   1        |  TV       |    2016-09-26T00:00:00Z |
    |   1        |  Movie    |    2016-09-19T00:00:00Z |
    |   2        |  TV       |    2016-09-26T00:00:00Z | 

Now I need to get the date difference between mostRecent and startTime (2016-09-29T00:00:00Z) , so that I can get :

   { id : 1, {"itemMap" : {"TV" : 3, "Movie" : 10 }} } 
   { id : 2, {"itemMap" : {"TV" : 3}} } 

I tried like this :

     val startTime = "2016-09-26T00:00:00Z"

     val result = df.groupBy("id", "itemName").agg(datediff(startTime, max("eventTimeStamp")) as Daydifference)


 case class Data (itemMap : Map[String, Long]) extends Serializable


 result.map{
    case r =>
    val id = r.getAs[String]("id")
    val itemName = r.getAs[String]("itemName")
    val Daydifference = r.getAs[Long]("Daydifference")

    (id, Map(itemName -> Daydifference ))

}.reduceByKey((x, y) => x ++ y).map{
      case (k, v) =>
        (k, JacksonUtil.toJson(Data(v)))
    }

But getting error on datediff. Can some one tell me how do I acheive this ?

Upvotes: 1

Views: 3197

Answers (1)

Tzach Zohar
Tzach Zohar

Reputation: 37852

When you want to use some constant ("literal") value as a Column in a DataFrame, you should use the lit(...) function. The other error here is trying to use a String as the startDate, to compare it to a timestamp column you can use java.sql.Date:

val startTime = new java.sql.Date(2016, 8, 26) // beware, months are Zero-based

val result = df.groupBy("id", "itemName")
  .agg(datediff(lit(startTime), max("eventTimeStamp")) as "Daydifference")

result.show()
//  +---+--------+-------------+
//  | id|itemName|Daydifference|
//  +---+--------+-------------+
//  |  1|   Movie|            7|
//  |  1|      TV|            0|
//  |  2|      TV|            0|
//  +---+--------+-------------+

Upvotes: 1

Related Questions