Kepler
Kepler

Reputation: 429

How to pick up the earliest timestamp date from the RDD in scala

I have a RDD which would be like ((String, String), TimeStamp). I have large number of records and I want to select for each key the record with latest TimeStamp value. I have tried the following code and still struggling to to this. Can anybody help me to do this ?

The below code I tried is wrong and not working as well

val context = sparkSession.read.format("jdbc")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("url", url)
  .option("dbtable", "student_risk")
  .option("user", "user")
  .option("password", "password")
  .load()
context.cache();

val studentRDD = context.rdd.map(r => ((r.getString(r.fieldIndex("course_id")), r.getString(r.fieldIndex("student_id"))), r.getTimestamp(r.fieldIndex("risk_date_time"))))
val filteredRDD = studentRDD.collect().map(z => (z._1, z._2)).reduce((x, y) => (x._2.compareTo(y._2)))

Upvotes: 3

Views: 2536

Answers (2)

Assaf Mendelson
Assaf Mendelson

Reputation: 13001

First your code provide incorrect results because the reduce is incorrect. The reduce function returns an int (from compareTo) instead of the pair x,y but int has no ._2 member. To correct this try:

  studentRDD.collect().map(z => (z._1, z._2)).reduce((x ,y) => if (x._2.compareTo(y._2) < 0) x else y)._1

Basically this new function would return the record with the smaller time and then on the overall result (the smallest) you take the key.

Note that you are doing all of this on the driver because of the collect. There is no reason to collect, map and reduce work on RDD so you can get the same result (and still be scalable) by doing this: studentRDD.map(z => (z._1, z._2)).reduce((x ,y) => if (x._2.compareTo(y._2) < 0) x else y)._1

You can do this directly from your context dataframe though:

val targetRow = context.agg(min(struct('risk_date_time, 'course_id, 'student_id)) as "rec").select($"rec.*").collect()(0)
val key = (targetRow.getString(1), targetRow.getString(2))

Upvotes: 3

Tzach Zohar
Tzach Zohar

Reputation: 37852

It's easy to do directly on the DataFrame (oddly named context here):

val result = context
  .groupBy("course_id", "student_id")
  .agg(min("risk_date_time") as "risk_date_time")

Then you can convert it into RDD (if needed) as you did before - the result has the same schema.

If you DO want to perform this over the RDD, use reduceByKey:

studentRDD.reduceByKey((t1, t2) => if (t1.before(t2)) t1 else t2)

Upvotes: 7

Related Questions