Reputation: 65
Suppose we have the following csv file like
fname,age,class,dob
Second csv file name like
f_name,user_age,class,DataofBith
I am trying to make it common header for all csv file that return same Dataframe of header like in standard manner like
first_name,age,class,dob
val df2 = df.withColumnRenamed("DateOfBirth","DateOfBirth").withColumnRenamed("fname","name")
df2.printSchema()
But that way is not generic. Can we do this in a dynamic manner for all CSV as per standard conversion like of DataFrame of CSV in fname,f_name
it should be converted to the name ?
Upvotes: 1
Views: 813
Reputation: 7316
You can use a simple select
in combination with Scala Map
. Is easier to handle the column transformations via a dictionary (Map) is which key will be the old name and value the new name.
Lets create first the two datasets as you described them:
val df1 = Seq(
("toto", 23, "g", "2010-06-09"),
("bla", 35, "s", "1990-10-01"),
("pino", 12, "a", "1995-10-05")
).toDF("fname", "age", "class", "dob")
val df2 = Seq(
("toto", 23, "g", "2010-06-09"),
("bla", 35, "s", "1990-10-01"),
("pino", 12, "a", "1995-10-05")
).toDF("f_name", "user_age", "class", "DataofBith")
Next we have created a Scala function named transform
which accept two arguments, the target df
and mapping
which contains the transformations details:
val mapping = Map(
"fname" -> "first_name",
"f_name" -> "first_name",
"user_age" -> "age",
"DataofBith" -> "dob"
)
def transform(df: DataFrame, mapping: Map[String, String]) : DataFrame = {
val keys = mapping.keySet
val cols = df.columns.map{c =>
if(keys.contains(c))
df(c).as(mapping(c))
else
df(c)
}
df.select(cols:_*)
}
The function goes through the given columns checking first whether the current column exists in mapping
. If so, it renames using the corresponding value from the dictionary otherwise the column remains untouched. Note that this will just rename the column (via alias) hence we don't expect to affect performance.
Finally, some examples:
val newDF1 = transform(df1, mapping)
newDF1.show
// +----------+---+-----+----------+
// |first_name|age|class| dob|
// +----------+---+-----+----------+
// | toto| 23| g|2010-06-09|
// | bla| 35| s|1990-10-01|
// | pino| 12| a|1995-10-05|
// +----------+---+-----+----------+
val newDF2 = transform(df2, mapping)
newDF2.show
// +----------+---+-----+----------+
// |first_name|age|class| dob|
// +----------+---+-----+----------+
// | toto| 23| g|2010-06-09|
// | bla| 35| s|1990-10-01|
// | pino| 12| a|1995-10-05|
// +----------+---+-----+----------+
Upvotes: 1
Reputation: 409
You can use List of schema then Iterate on top of schema like below -
Approach :1
val df= Seq((1,"goutam","kumar"),(2,"xyz","kumar")).toDF("id","fname","lname")
val schema=Seq("id"->"sid","fname"->"sfname","lname"->"slname")
val mapedSchema = schema.map(x=>df(x._1).as(x._2))
df.select(mapedSchema :_*)
while reading csv give "option("header", false)"
then you can get read of mapping of old schema with new schema.
Approach :2
val schema=Seq("sid","sfname","slname")
val mapedSchema=data.columns.zip(schema)
val mapedSchemaWithDF = mapedSchema.map(x=>df(x._1).as(x._2))
df.select(mapedSchemaWithDF:_*)
Upvotes: 2
Reputation: 306
The function withColumnRenamed
works also if the column is not present in the dataframe. Hence you can go ahead and read all dataframes and apply the same renaming logic everywhere and union them all later.
import org.apache.spark.sql.DataFrame
def renaming(df: DataFrame): DataFrame = {
df.withColumnRenamed("dob", "DateOfBirth")
.withColumnRenamed("fname", "name")
.withColumnRenamed("f_name", "name")
.withColumnRenamed("user_age", "age")
// append more renaming functions here
}
val df1 = renaming(spark.read.csv("...your path here"))
val df2 = renaming(spark.read.csv("...another path here"))
val result = df1.unionAll(df2)
result
will have the same schema (DateOfBirth, name, age)
in this case.
Edit:
Following your input, if I understand correctly what you have to do, what about this?
val df1 = spark.read.csv("...your path here").toDF("name", "age", "class", "born_date")
val df2 = spark.read.csv("...another path here").toDF("name", "age", "class", "born_date")
Upvotes: 1