Pradeep
Pradeep

Reputation: 303

How to dynamically rename a column in Spark DF based on the case class

Trying to rename an existing column based on the case class which is a JSON file.

Ex:

// Case class mapped to the JSON config
case class ObjMapping (colName:String,
                  renameCol:Option[String],
                  colOrder:Option[Integer]               
                  )
// JSON config mapped to case class
val newOb = List(ObjMapping("DEPTID",Some("DEPT_ID"),Some(1)), ObjMapping("EMPID",Some("EMP_ID"),Some(4)), ObjMapping("DEPT_NAME",None,Some(2)), ObjMapping("EMPNAME",Some("EMP_NAME"),Some(3)))

Sample source DF.

val empDf = Seq(
         (1,10,"IT","John"),
         (2,20,"DEV","Ed"),
         (2,30,"OPS","Brian")
).toDF("DEPTID","EMPID","DEPT_NAME","EMPNAME")

Based on the above config, wanted to rename the DF columns. So EMPID renamed to EMP_ID and EPNAME to EMP_NAME and DEPTID to DEPT_ID.

Upvotes: 0

Views: 484

Answers (1)

Raphael Roth
Raphael Roth

Reputation: 27373

You can do it like this :

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._

val selectExpr : Seq[Column] = newOb
  .sortBy(_.colOrder)
  .map(om => col(om.colName).as(om.renameCol.getOrElse(om.colName)))

empDf
  .select(selectExpr:_*)
  .show()

gives:

+-------+---------+--------+------+
|DEPT_ID|DEPT_NAME|EMP_NAME|EMP_ID|
+-------+---------+--------+------+
|      1|       IT|    John|    10|
|      2|      DEV|      Ed|    20|
|      2|      OPS|   Brian|    30|
+-------+---------+--------+------+

Upvotes: 2

Related Questions