karan kumar
karan kumar

Reputation: 33

How to Handle different date Format in csv file while reading Dataframe in SPARK using option("dateFormat")?

I am having below Input file

INPUTFILE_CNTRY_CODE|INPUTFILE_CTY_CODE|INPUTFILE_ID|INPUTFILE_LTY_ID|INPUTFILE_CNSM_ID|INPUTFILE_DATE|INPUTFILE_TIME|INPUTFILE_TRDATE
GBR|263|326735246||I034867789V|15/11/30|2015-11-30 00:00:00.000000|2016-22-06
GBR|263|397802068|PC7135361|PC7135361|16/05/20|2016-10-06 11:50:05.000000|2016-22-07

and I am trying to Read this as below.

val registeration_schema = StructType(List(
      StructField("INPUTFILE_CNTRY_CODE", StringType),
      StructField("INPUTFILE_CTY_CODE", IntegerType),
      StructField("INPUTFILE_ID", IntegerType),
      StructField("INPUTFILE_LTY_ID", StringType),
      StructField("INPUTFILE_CNSM_ID", StringType),
      StructField("INPUTFILE_DATE", DateType),
      StructField("INPUTFILE_TIME", TimestampType),
      StructField("INPUTFILE_TRDATE", DateType)
    ))
    val registerationDF = spark.read
      .option("header", "true")
      .option("delimiter", "|")
      .option("mode", "FAILFAST")
      .schema(registeration_schema)
      .option("dateFormat", "yy/M/d")
      .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
      .csv("registration2.csv")

And I am getting below Error .

Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: java.time.format.DateTimeParseException: Text '2016-22-06' could not be parsed at index 2
    at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:262)
    at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$doParse$2(UnivocityParser.scala:200)
    at org.apache.spark.sql.catalyst.csv.UnivocityParser.parse(UnivocityParser.scala:207)
    at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$1(UnivocityParser.scala:347)
    at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60)
    ... 27 more
Caused by: java.time.format.DateTimeParseException: Text '2016-22-06' could not be parsed at index 2

This is happening because of different date formats and I have specified only one of the date formats while loading Dataframe. Can someone guide how to handle more than one dateformat while reading CSV into dataframe.

Upvotes: 3

Views: 1303

Answers (1)

Shantanu Kher
Shantanu Kher

Reputation: 1054

You can not define more than one format for DateType while loading a CSV. But you can achieve this by playing around date_format() and to_date() functions available with spark 2.2+.

Steps at high level are as follows -

  1. Define one of these two columns as string in original Schema. I chose to define INPUTFILE_DATE as string in my demonstration.
  2. Format INPUTFILE_DATE in an appropriate format using date_format() and to_date() functions to convert its datatype to date.

Define your original schema with INPUTFILE_DATE --> StringType

val registeration_schema = StructType(List(
      StructField("INPUTFILE_CNTRY_CODE", StringType),
      StructField("INPUTFILE_CTY_CODE", IntegerType),
      StructField("INPUTFILE_ID", IntegerType),
      StructField("INPUTFILE_LTY_ID", StringType),
      StructField("INPUTFILE_CNSM_ID", StringType),
      StructField("INPUTFILE_DATE", StringType),
      StructField("INPUTFILE_TIME", TimestampType),
      StructField("INPUTFILE_TRDATE", DateType)
    ))
    val registerationDF = spark.read
      .option("header", "true")
      .option("delimiter", "|")
      .option("mode", "FAILFAST")
      .schema(registeration_schema)
      .option("dateFormat", "yyyy-dd-MM")
      .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
      .csv("registration2.csv")

Core part of this solution is -

val targetDF  = registerationDF.withColumn("INPUTFILE_DATE",to_date((date_format(to_date(col("INPUTFILE_DATE"),"yy/MM/dd"),"yyyy-dd-MM")),"yyyy-dd-MM"))

End Result -

scala> targetDF.printSchema()
root
 |-- INPUTFILE_CNTRY_CODE: string (nullable = true)
 |-- INPUTFILE_CTY_CODE: integer (nullable = true)
 |-- INPUTFILE_ID: integer (nullable = true)
 |-- INPUTFILE_LTY_ID: string (nullable = true)
 |-- INPUTFILE_CNSM_ID: string (nullable = true)
 |-- INPUTFILE_DATE: date (nullable = true)
 |-- INPUTFILE_TIME: timestamp (nullable = true)
 |-- INPUTFILE_TRDATE: date (nullable = true)


scala> targetDF.show()

+--------------------+------------------+------------+----------------+-----------------+--------------+-------------------+----------------+
|INPUTFILE_CNTRY_CODE|INPUTFILE_CTY_CODE|INPUTFILE_ID|INPUTFILE_LTY_ID|INPUTFILE_CNSM_ID|INPUTFILE_DATE|     INPUTFILE_TIME|INPUTFILE_TRDATE|
+--------------------+------------------+------------+----------------+-----------------+--------------+-------------------+----------------+
|                 GBR|               263|   326735246|            null|      I034867789V|    2015-11-30|2015-11-30 00:00:00|      2017-10-06|
|                 GBR|               263|   397802068|       PC7135361|        PC7135361|    2016-05-20|2016-10-06 11:50:05|      2017-10-07|
+--------------------+------------------+------------+----------------+-----------------+--------------+-------------------+----------------+

Upvotes: 1

Related Questions