Riccardo Lamera
Riccardo Lamera

Reputation: 105

Spark (Scala), considering days before 2 years ago

I'm writing a Spark batch job in Scala and need to filter a dataframe ('driverTable', with column 'date') so that I keep only dates below 2 years ago (discarding all other columns).

val dayList: Seq[Date] = driverTable
        .select("date")
        .as[Date]
        .distinct
        .filter(s"date <= ... ")
        .collect()
        .sortBy(_.getTime)
        .toSeq

driverTable: org.apache.spark.sql.DataFrame = [flowname: string, date: date]

'date' is in java.sql.Date format.

How do I fill .filter? So, if today is 05/25/2021, then I need to filter in all days before 05/25/2019. If February 29, then February 28 of 2 years before is the threshold.

Upvotes: 0

Views: 763

Answers (1)

Coursal
Coursal

Reputation: 1387

Instead of deliberately casting your date column to java.sql.Date and manage it like a Java object, it's safer to convert it to Spark's native date by using to_date (Scala docs here) where you can optionally specify the date format that is being used in your column with a simple String argument (as we will see later on).

After that, all we need to get is:

  • the current date of execution, and
  • a way to calculate the date difference in years based on date

As for the current date, Spark provides current_timestamp() (Scala docs here), in which we can enforce our desired date format by encapsulating it within a date_format method (Scala docs here) (just like we use to_date like we mentioned before).

Now to calculate the difference in years between the current and given date in the date column, we can take advantage of the months_between methods (Scala docs here) that returns the difference of months in Long format (both as positive and negative numbers, so we need to get the absolute value of it). You can also check out this answer to get a better look at its usage.

Let's say that we have the following input DataFrame df with a yyyy-MM-dd date format in the date column:

+---+----------+
| id|      date|
+---+----------+
|  1|2021-05-25|
|  2|2020-05-26|
|  3|2020-05-20|
|  4|2019-05-26|
|  5|2019-05-10|
+---+----------+

All we need to do is specify the date format in date and then filter out the rows of df by checking if the absolute month difference between the current date and each given date is less than 24 months:

df.select("date")
          .withColumn("date", to_date(col("date"), "yyyy-MM-dd"))
          .filter(abs(months_between(col("date"), date_format(current_timestamp(), "yyyy-MM-dd"))) <= 24)

This will result in filtering out the dates that are more than 2 years old from the current date, as seen below:

+----------+
|      date|
+----------+
|2021-05-25|
|2020-05-26|
|2020-05-20|
|2019-05-26|
+----------+

Upvotes: 1

Related Questions