Reputation: 936
I would like to change the value of multiple fields in a row of a dataframe df
. Normally, I would do a row to row transformation using a map. Something like:
+---+---------+
|num|name |
+---+---------+
| 1|Hydrogen |
| 2|Helium |
+---+---------+
df.map(row=>{
val name = row.getAs("name").toString.toUpperCase
(row(0),name)
})
But now I have a dataframe which has a very elaborate schema of many columns, out of which I would want to change the value of only some columns. The change in the value of one column is dependent on other columns. How can I avoid writing all the column values (like row.get(0), row.get(1) ... row.get(30)
) in the tuple but only write the ones which have changed? Consider a df
with this schema:
case class DFSchema(id: String, name: String, map1: Map[String, String], ... , map30[Sting, String])
I want to update the keys and values of df.select("map30")
and modify "name"
only if id
is "city"
. Of course, there are more such transformations to be made in other columns (represented in schema as mapX
.
I did not consider using UDF for this problem as even if the UDF returns a struct of many columns, I do not know how to change multiple columns using withColumn()
as it only accepts "one" column name. However, solutions using UDF are equally welcome as using .map
over rows.
Upvotes: 1
Views: 995
Reputation: 185
You can try something like this:
val rules = Seq(
"columnA" -> lit(20),
"columnB" -> col("columnB").plus(col("columnC")),
"columnC" -> col("columnC").minus(col("columnD")),
"columnN" -> col("columnA").plus(col("columnB")).plus(col("columnC"))
)
def (inputDf: DataFrame): DataFrame = {
rules.foldLeft(inputDf) {
case (df, (columnName, ruleColumn)) => df.withColumn(columnName, ruleColumn)
}
}
Here we have rules
which is a sequence of pairs where the first value is a name of the target column we want to change/add and the second one is a rule which should be applied using dependent columns.
Using the foldLeft
operation we apply all the rules to the input DataFrame
.
Upvotes: 3
Reputation: 175
You can try this :
df.show(false)
val newColumns = df.columns.map { x =>
if (x == "name") {
when(col("id") === "city", lit("miami")).otherwise(col("name")).as("name")
} else if (x == "map30") {
when(col("id") === "city", map(lit("h"), lit("update"), lit("n"), lit("new"))).otherwise(col("map30")).as("map30")
} else {
col(x).as(x)
}
}
val cleanDf = df.select(newColumns: _*)
cleanDf.show(false)
Upvotes: 2