Reputation: 387
I have a set of columns in my input data on which I am pivoting based on multiple columns.
I am facing issues with the column headers after the pivoting is done.
Input data
Output Generated by my approach -
Expected Output Headers:
I need the headers of the output to look like -
Steps done so far to achieve the Output I am getting -
// *Load the data*
scala> val input_data =spark.read.option("header","true").option("inferschema","true").option("delimiter","\t").csv("s3://mybucket/data.tsv")
// *Filter the data where residentFlag column = T*
scala> val filtered_data = input_data.select("numericID","age","salary","gender","residentFlag").filter($"residentFlag".contains("T"))
// *Now we will the pivot the filtered data by each column*
scala> val pivotByAge = filtered_data.groupBy("age","numericID").pivot("age").agg(expr("coalesce(first(numericID),'-')")).drop("age")
// *Pivot the data by the second column named "salary"*
scala> val pivotBySalary = filtered_data.groupBy("salary","numericID").pivot("salary").agg(expr("coalesce(first(numericID),'-')")).drop("salary")
// *Join the above two dataframes based on the numericID*
scala> val intermediateDf = pivotByAge.join(pivotBySalary,"numericID")
// *Now pivot the filtered data on Step 2 on the third column named Gender*
scala> val pivotByGender = filtered_data.groupBy("gender","numericID").pivot("gender").agg(expr("coalesce(first(numericID),'-')")).drop("gender")
// *Join the above dataframe with the intermediateDf*
scala> val outputDF= pivotByGender.join(intermediateDf ,"numericID")
How to rename the columns generated after pivoting?
Is there a different approach I can take for Pivoting the data set based on multiple columns (nearly 300)?
Any optimizations/suggestions for improving performance?
Upvotes: 1
Views: 1630
Reputation: 22449
You can consider using foldLeft to traverse the list of to-pivot columns to successively create pivot dataframe, rename the generated pivot columns, followed by the cumulative join:
val data = Seq(
(1, 30, 50000, "M"),
(1, 25, 70000, "F"),
(1, 40, 70000, "M"),
(1, 30, 80000, "M"),
(2, 30, 80000, "M"),
(2, 40, 50000, "F"),
(2, 25, 70000, "F")
).toDF("numericID", "age", "salary", "gender")
// Create list pivotCols which consists columns to pivot
val id = data.columns.head
val pivotCols = data.columns.filter(_ != "numericID")
// Create the first pivot dataframe from the first column in list pivotCols and
// rename each of the generated pivot columns
val c1 = pivotCols.head
val df1 = data.groupBy(c1, id).pivot(c1).agg(expr(s"coalesce(first($id),'-')")).drop(c1)
val df1Renamed = df1.columns.tail.foldLeft( df1 )( (acc, x) =>
acc.withColumnRenamed(x, c1 + "_" + x)
)
// Using the first pivot dataframe as the initial dataframe, process each of the
// remaining columns in list pivotCols similar to how the first column is processed,
// and cumulatively join each of them with the previously joined dataframe
pivotCols.tail.foldLeft( df1Renamed )(
(accDF, c) => {
val df = data.groupBy(c, id).pivot(c).agg(expr(s"coalesce(first($id),'-')")).drop(c)
val dfRenamed = df.columns.tail.foldLeft( df )( (acc, x) =>
acc.withColumnRenamed(x, c + "_" + x)
)
dfRenamed.join(accDF, Seq(id))
}
)
// +---------+--------+--------+------------+------------+------------+------+------+------+
// |numericID|gender_F|gender_M|salary_50000|salary_70000|salary_80000|age_25|age_30|age_40|
// +---------+--------+--------+------------+------------+------------+------+------+------+
// |2 |2 |- |2 |- |- |- |2 |- |
// |2 |2 |- |2 |- |- |2 |- |- |
// |2 |2 |- |2 |- |- |- |- |2 |
// |2 |2 |- |- |2 |- |- |2 |- |
// |2 |2 |- |- |2 |- |2 |- |- |
// |2 |2 |- |- |2 |- |- |- |2 |
// |2 |2 |- |- |- |2 |- |2 |- |
// |2 |2 |- |- |- |2 |2 |- |- |
// |2 |2 |- |- |- |2 |- |- |2 |
// |2 |- |2 |2 |- |- |- |2 |- |
// |2 |- |2 |2 |- |- |2 |- |- |
// |2 |- |2 |2 |- |- |- |- |2 |
// |2 |- |2 |- |2 |- |- |2 |- |
// |2 |- |2 |- |2 |- |2 |- |- |
// |2 |- |2 |- |2 |- |- |- |2 |
// |2 |- |2 |- |- |2 |- |2 |- |
// |2 |- |2 |- |- |2 |2 |- |- |
// |2 |- |2 |- |- |2 |- |- |2 |
// |1 |- |1 |- |1 |- |1 |- |- |
// |1 |- |1 |- |1 |- |- |- |1 |
// ...
Upvotes: 2
Reputation: 76
You could do something like this and use regex to simplify
var outputDF= pivotByGender.join(intermediateDf ,"numericID")
val cols: Array[String] = outputDF.columns
cols
.foreach{
cl => cl match {
case "F" => outputDF = outputDF.withColumnRenamed(cl,s"gender_${cl}")
case "M" => outputDF = outputDF.withColumnRenamed(cl,s"gender_${cl}")
case cl.matches("""\\d{2}""") => outputDF = outputDF.withColumnRenamed(cl,s"age_${cl}")
}
}
Upvotes: 1