Vijay
Vijay

Reputation: 974

add new columns by Casting column to given type dynamically in spark data frame

I have two data frames having the same columns with different schema

val schema = StructType(
     |   StructField("firstName", StringType, true) ::
     |     StructField("lastName", IntegerType, false) ::
     |       StructField("lod", DateType, false) ::
     |     StructField("dob", DateType, false) :: Nil)
    
val df = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)

val df2 = Seq(("eturi","1","15/10/2020","Jun/01/2012"),
("vijaynull","12","25/08/2001","Aug/14/2014"),
("lakshmi","xyz",null,"Jul/15/2015")
).toDF("firstName","lastName","lod","dob")

 df2.printSchema
root
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- lod: string (nullable = true)
 |-- dob: string (nullable = true)

Now I need to cast the df2 columns based on df column data type by adding a new column for each column of df2 and also I need to cast all date and timestamp columns based on the pattern given in another list.

val date_formats = List("dd/MM/yyyy","MMM/dd/yyyy")
val df_data_val = df.columns.foldLeft(df2) { case (tmpdf, c) =>
  df.schema(c).dataType match {
    case DateType =>
      if (date_formats.contains("dd/MM/yyyy",)) {
        tmpdf.withColumn(
          c + "_cast",
          to_date(unix_timestamp(df2(c), "dd/MM/yyyy").cast(TimestampType))
        )       
      } else if (date_formats.contains("MMM/dd/yyyy")) {
        tmpdf.withColumn(
          c + "_cast",
          to_date(unix_timestamp(df2(c), "MMM/dd/yyyy").cast(TimestampType))
        )         
      }
    else{
      tmpdf.withColumn(c + "_cast", df2(c).cast(df.schema(c).dataType))
    
    case _ => tmpdf.withColumn(c + "_cast", df2(c).cast(df.schema(c).dataType))
  }
}

Got below output enter image description here Expected output

enter image description here

Upvotes: 0

Views: 253

Answers (1)

mck
mck

Reputation: 42422

You can handle different date formats using coalesce instead of if/else:

val date_formats = List("dd/MM/yyyy","MMM/dd/yyyy")

val df_data_val = df.columns.foldLeft(df2) { case (tmpdf, c) =>
  df.schema(c).dataType match {
    case DateType => tmpdf.withColumn(
      c + "_cast", 
      coalesce(date_formats.map(f => to_date(unix_timestamp(df2(c), f).cast(TimestampType))):_*)
    )
    case _ => tmpdf.withColumn(c + "_cast", df2(c).cast(df.schema(c).dataType))
  }
}

df_data_val.show
+---------+--------+----------+-----------+--------------+-------------+----------+----------+
|firstName|lastName|       lod|        dob|firstName_cast|lastName_cast|  lod_cast|  dob_cast|
+---------+--------+----------+-----------+--------------+-------------+----------+----------+
|    eturi|       1|15/10/2020|Jun/01/2012|         eturi|            1|2020-10-15|2012-06-01|
|vijaynull|      12|25/08/2001|Aug/14/2014|     vijaynull|           12|2001-08-25|2014-08-14|
|  lakshmi|     xyz|      null|Jul/15/2015|       lakshmi|         null|      null|2015-07-15|
+---------+--------+----------+-----------+--------------+-------------+----------+----------+

Upvotes: 1

Related Questions