jackson smith
jackson smith

Reputation: 65

how to change data-frame header based on particular value?

Suppose we have the following csv file like

fname,age,class,dob

Second csv file name like

f_name,user_age,class,DataofBith

I am trying to make it common header for all csv file that return same Dataframe of header like in standard manner like

first_name,age,class,dob
val df2 = df.withColumnRenamed("DateOfBirth","DateOfBirth").withColumnRenamed("fname","name")
df2.printSchema()

But that way is not generic. Can we do this in a dynamic manner for all CSV as per standard conversion like of DataFrame of CSV in fname,f_name it should be converted to the name ?

Upvotes: 1

Views: 813

Answers (3)

abiratsis
abiratsis

Reputation: 7316

You can use a simple select in combination with Scala Map. Is easier to handle the column transformations via a dictionary (Map) is which key will be the old name and value the new name.

Lets create first the two datasets as you described them:

val df1 = Seq(
  ("toto", 23, "g", "2010-06-09"),
  ("bla", 35, "s", "1990-10-01"),
  ("pino", 12, "a", "1995-10-05")
).toDF("fname", "age", "class", "dob")

val df2 = Seq(
  ("toto", 23, "g", "2010-06-09"),
  ("bla", 35, "s", "1990-10-01"),
  ("pino", 12, "a", "1995-10-05")
).toDF("f_name", "user_age", "class", "DataofBith")

Next we have created a Scala function named transform which accept two arguments, the target df and mapping which contains the transformations details:


val mapping = Map(
  "fname" -> "first_name",
  "f_name" -> "first_name",
  "user_age" -> "age",
  "DataofBith" -> "dob"
)

def transform(df: DataFrame, mapping: Map[String, String]) : DataFrame = {
  val keys = mapping.keySet
  val cols = df.columns.map{c => 
    if(keys.contains(c))
      df(c).as(mapping(c))
    else
      df(c)
  }

  df.select(cols:_*)
}

The function goes through the given columns checking first whether the current column exists in mapping. If so, it renames using the corresponding value from the dictionary otherwise the column remains untouched. Note that this will just rename the column (via alias) hence we don't expect to affect performance.

Finally, some examples:

val newDF1 = transform(df1, mapping)
newDF1.show

// +----------+---+-----+----------+
// |first_name|age|class|       dob|
// +----------+---+-----+----------+
// |      toto| 23|    g|2010-06-09|
// |       bla| 35|    s|1990-10-01|
// |      pino| 12|    a|1995-10-05|
// +----------+---+-----+----------+


val newDF2 = transform(df2, mapping)
newDF2.show

// +----------+---+-----+----------+
// |first_name|age|class|       dob|
// +----------+---+-----+----------+
// |      toto| 23|    g|2010-06-09|
// |       bla| 35|    s|1990-10-01|
// |      pino| 12|    a|1995-10-05|
// +----------+---+-----+----------+

Upvotes: 1

z_1_p
z_1_p

Reputation: 409

You can use List of schema then Iterate on top of schema like below -

Approach :1
val df= Seq((1,"goutam","kumar"),(2,"xyz","kumar")).toDF("id","fname","lname")
val schema=Seq("id"->"sid","fname"->"sfname","lname"->"slname")
val mapedSchema = schema.map(x=>df(x._1).as(x._2))
df.select(mapedSchema :_*)

while reading csv give "option("header", false)" then you can get read of mapping of old schema with new schema.

Approach :2
val schema=Seq("sid","sfname","slname")
val mapedSchema=data.columns.zip(schema)
val mapedSchemaWithDF = mapedSchema.map(x=>df(x._1).as(x._2))
df.select(mapedSchemaWithDF:_*)

Upvotes: 2

meniluca
meniluca

Reputation: 306

The function withColumnRenamed works also if the column is not present in the dataframe. Hence you can go ahead and read all dataframes and apply the same renaming logic everywhere and union them all later.

import org.apache.spark.sql.DataFrame

def renaming(df: DataFrame): DataFrame = {
   df.withColumnRenamed("dob", "DateOfBirth")
     .withColumnRenamed("fname", "name")
     .withColumnRenamed("f_name", "name")
     .withColumnRenamed("user_age", "age")
 // append more renaming functions here
}

val df1 = renaming(spark.read.csv("...your path here"))

val df2 = renaming(spark.read.csv("...another path here"))

val result = df1.unionAll(df2)

result will have the same schema (DateOfBirth, name, age) in this case.

Edit:

Following your input, if I understand correctly what you have to do, what about this?

val df1 = spark.read.csv("...your path here").toDF("name", "age", "class", "born_date")

val df2 = spark.read.csv("...another path here").toDF("name", "age", "class", "born_date")

Upvotes: 1

Related Questions