Stark
Stark

Reputation: 634

Spark : Parse a Date / Timestamps with different Formats (MM-dd-yyyy HH:mm, MM/dd/yy H:mm ) in same column of a Dataframe

The problem is: I have a dataset where a column having 2 or more types of date format. In general I select all values as String type and then use the to_date to parse the date. But I don't know how do I parse a column having two or more types of date formats.

val DF= Seq(("02-04-2020 08:02"),("03-04-2020 10:02"),("04-04-2020 09:00"),("04/13/19 9:12"),("04/14/19 2:13"),("04/15/19 10:14"), ("04/16/19 5:15")).toDF("DOB")

import org.apache.spark.sql.functions.{to_date, to_timestamp}
val DOBDF = DF.withColumn("Date", to_date($"DOB", "MM/dd/yyyy"))

Output from the above command:

null
null
null
0019-04-13
0019-04-14
0019-04-15
0019-04-16

The code above I have written is not working for the format MM/dd/yyyy and the format which did not provided for that I am getting the null as a output.

So seeking the help to parse the file with different date formats. If possible kindly also share some tutorial or notes to the deal with the date formats. Please note: I am using Scala for the spark framework.

Thanks in advance.

Upvotes: 3

Views: 5217

Answers (3)

girip11
girip11

Reputation: 169

We can use coalesce function as mentioned in the accepted answer. On each format mismatch, to_date returns null, which makes coalesce to move to the next format in the list.

But with to_date, if you have issues in parsing the correct year component in the date in yy format (In the date 7-Apr-50, if you want 50 to be parsed as 1950 or 2050), refer to this stackoverflow post

  import org.apache.spark.sql.functions.coalesce

  // Reference: https://spark.apache.org/docs/3.0.0/sql-ref-datetime-pattern.html
  val parsedDateCol: Column = coalesce(
    // Four letters of M looks for full name of the Month
    to_date(col("original_date"), "MMMM, yyyy"),
    to_date(col("original_date"), "dd-MMM-yy"),
    to_date(col("original_date"), "yyyy-MM-dd"),
    to_date(col("original_date"), "d-MMM-yy")
  )

  // I have used some dummy dataframe name.
  dataframeWithDateCol.select(
      parsedDateCol.as("parsed_date")
    )
    .show()

Upvotes: 0

ValaravausBlack
ValaravausBlack

Reputation: 691

Check EDIT section to use Column functions instead of UDF for performance benefits in later part of this solution --

Well, Let's do it try-catch way.. Try a column conversion against each format and keep the success value. You may have to provide all possible format from outside as parameter or keep a master list of all possible formats somewhere in code itself..

Here is the possible solution.. ( Instead of SimpleDateFormatter which sometimes have issues on timestamps beyond milliseconds, I use new library - java.time.format.DateTimeFormatter)

Create a to_timestamp Function, which accepts string to convert to timestamp and all possible Formats

  import java.time.LocalDate
  import java.time.LocalDateTime
  import java.time.LocalTime
  import java.time.format.DateTimeFormatter
  import scala.util.Try

def toTimestamp(date: String, tsformats: Seq[String]): Option[java.sql.Timestamp] = {

    val out = (for (tsft <- tsformats) yield {
      val formatter = new DateTimeFormatterBuilder()
        .parseCaseInsensitive()
        .appendPattern(tsft).toFormatter()
      if (Try(java.sql.Timestamp.valueOf(LocalDateTime.parse(date, formatter))).isSuccess)
        Option(java.sql.Timestamp.valueOf(LocalDateTime.parse(date, formatter)))
      else None

    }).filter(_.isDefined)
    if (out.isEmpty) None else out.head
  }

Create a UDF on top of it - ( this udf takes Seq of Format strings as parameter)

 def UtoTimestamp(tsformats: Seq[String]) = org.apache.spark.sql.functions.udf((date: String) => toTimestamp(date, tsformats))

And now, simply use it in your spark code.. Here's the test with your Data -

    val DF = Seq(("02-04-2020 08:02"), ("03-04-2020 10:02"), ("04-04-2020 09:00"), ("04/13/19 9:12"), ("04/14/19 2:13"), ("04/15/19 10:14"), ("04/16/19 5:15")).toDF("DOB")

    val tsformats = Seq("MM-dd-yyyy HH:mm", "MM/dd/yy H:mm")

    DF.select(UtoTimestamp(tsformats)('DOB)).show

And here is the output -

+-------------------+
|           UDF(DOB)|
+-------------------+
|2020-02-04 08:02:00|
|2020-03-04 10:02:00|
|2020-04-04 09:00:00|
|2019-04-13 09:12:00|
|2019-04-14 02:13:00|
|2019-04-15 10:14:00|
|2019-04-16 05:15:00|
+-------------------+


Cherry on top would be to avoid having to write UtoTimestamp(colname) for many columns in your dataframe. Let's write a function which accepts a Dataframe, List of all Timestamp columns, And all possible formats which your source data may have coded timestamps in..

It'd parse all timestamp columns for you with trying against formats..

def WithTimestampParsed(df: DataFrame, tsCols: Seq[String], tsformats: Seq[String]): DataFrame = {

    val colSelector = df.columns.map {
      c =>
        {
          if (tsCols.contains(c)) UtoTimestamp(tsformats)(col(c)) alias (c)
          else col(c)
        }
    }

Use it like this -

// You can pass as many column names in a sequence to be parsed
WithTimestampParsed(DF, Seq("DOB"), tsformats).show

Output -

+-------------------+
|                DOB|
+-------------------+
|2020-02-04 08:02:00|
|2020-03-04 10:02:00|
|2020-04-04 09:00:00|
|2019-04-13 09:12:00|
|2019-04-14 02:13:00|
|2019-04-15 10:14:00|
|2019-04-16 05:15:00|
+-------------------+

EDIT - I saw latest spark code, and they are also using java.time._ utils now to parse dates and timestamps which enable handling beyond Milliseconds.. Earlier these functions were based on SimpleDateFormat ( I wasn't relying on to_timestamps of spark earlier due to this limit) .

So with to_date & to_timestamp functions being so reliable now.. Let's use them instead of having to write a UDF.. Let's write a function which operates on Columns.

def to_timestamp_simple(col: org.apache.spark.sql.Column, formats: Seq[String]): org.apache.spark.sql.Column = {
    coalesce(formats.map(fmt => to_timestamp(col, fmt)): _*)
  }

and with this WithTimestampParsedwould look like -

def WithTimestampParsedSimple(df: DataFrame, tsCols: Seq[String], tsformats: Seq[String]): DataFrame = {

    val colSelector = df.columns.map {
      c =>
        {
          if (tsCols.contains(c)) to_timestamp_simple(col(c), tsformats) alias (c)
          else col(c)
        }
    }

    df.select(colSelector: _*)
  }

And use it like -

DF.select(to_timestamp_simple('DOB,tsformats)).show

//OR

WithTimestampParsedSimple(DF, Seq("DOB"), tsformats).show

Output looks like -

+---------------------------------------------------------------------------------------+
|coalesce(to_timestamp(`DOB`, 'MM-dd-yyyy HH:mm'), to_timestamp(`DOB`, 'MM/dd/yy H:mm'))|
+---------------------------------------------------------------------------------------+
|                                                                    2020-02-04 08:02:00|
|                                                                    2020-03-04 10:02:00|
|                                                                    2020-04-04 09:00:00|
|                                                                    2019-04-13 09:12:00|
|                                                                    2019-04-14 02:13:00|
|                                                                    2019-04-15 10:14:00|
|                                                                    2019-04-16 05:15:00|
+---------------------------------------------------------------------------------------+

+-------------------+
|                DOB|
+-------------------+
|2020-02-04 08:02:00|
|2020-03-04 10:02:00|
|2020-04-04 09:00:00|
|2019-04-13 09:12:00|
|2019-04-14 02:13:00|
|2019-04-15 10:14:00|
|2019-04-16 05:15:00|
+-------------------+

Upvotes: 5

Chema
Chema

Reputation: 2838

I put some code that maybe can help you in some way. I tried this

mport org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import java.sql.Date
import java.util.{GregorianCalendar}


object DateFormats {

  val spark = SparkSession
    .builder()
    .appName("Multiline")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions", "4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id", "Multiline")  // To silence Metrics warning
    .getOrCreate()

  val sc = spark.sparkContext

  def main(args: Array[String]): Unit = {

    Logger.getRootLogger.setLevel(Level.ERROR)


    try {

      import spark.implicits._

      val DF = Seq(("02-04-2020 08:02"),("03-04-2020 10:02"),("04-04-2020 09:00"),("04/13/19 9:12"),("04/14/19 2:13"),("04/15/19 10:14"), ("04/16/19 5:15")).toDF("DOB")

      import org.apache.spark.sql.functions.{to_date, to_timestamp}
      val DOBDF = DF.withColumn("Date", to_date($"DOB", "MM/dd/yyyy"))

      DOBDF.show()

      // todo: my code below
      DF
        .rdd
        .map(r =>{
            if(r.toString.contains("-")) {
              val dat = r.toString.substring(1,11).split("-")
              val calendar = new GregorianCalendar(dat(2).toInt,dat(1).toInt - 1,dat(0).toInt)
              (r.toString, new Date(calendar.getTimeInMillis))
            } else {
              val dat = r.toString.substring(1,9).split("/")
              val calendar = new GregorianCalendar(dat(2).toInt + 2000,dat(0).toInt - 1,dat(1).toInt)
              (r.toString, new Date(calendar.getTimeInMillis))
            }

        })
        .toDF("DOB","DATE")
        .show()

      // To have the opportunity to view the web console of Spark: http://localhost:4040/
      println("Type whatever to the console to exit......")
      scala.io.StdIn.readLine()
    } finally {
      sc.stop()
      println("SparkContext stopped.")
      spark.stop()
      println("SparkSession stopped.")
    }
  }
}
+------------------+----------+
|               DOB|      DATE|
+------------------+----------+
|[02-04-2020 08:02]|2020-04-02|
|[03-04-2020 10:02]|2020-04-03|
|[04-04-2020 09:00]|2020-04-04|
|   [04/13/19 9:12]|2019-04-13|
|   [04/14/19 2:13]|2019-04-14|
|  [04/15/19 10:14]|2019-04-15|
|   [04/16/19 5:15]|2019-04-16|
+------------------+----------+

Regards

Upvotes: 0

Related Questions