Reputation: 9024
I have the following dataset
And i want to convert this to the following using spark. Any pointers would be helpful.
Upvotes: 0
Views: 373
Reputation: 1892
spark 2.4.3 you can you map_from_array and it is pretty straight forward and inbuilt function.
scala> val df = Seq((1,40,60,10), (2,34,10,20), (3,87,29,62) ).toDF("cust_id","100x","200x","300x")
scala> df.show
+-------+----+----+----+
|cust_id|100x|200x|300x|
+-------+----+----+----+
| 1| 40| 60| 10|
| 2| 34| 10| 20|
| 3| 87| 29| 62|
+-------+----+----+----+
Apply map_from_array and explode it will give your desired result
df.select(array('*).as("v"), lit(df.columns).as("k")).select('v.getItem(0).as("cust_id"), map_from_arrays('k,'v).as("map")).select('cust_id, explode('map)).show(false)
+-------+-------+-----+
|cust_id|key |value|
+-------+-------+-----+
|1 |cust_id|1 |
|1 |100x |40 |
|1 |200x |60 |
|1 |300x |10 |
|2 |cust_id|2 |
|2 |100x |34 |
|2 |200x |10 |
|2 |300x |20 |
|3 |cust_id|3 |
|3 |100x |87 |
|3 |200x |29 |
|3 |300x |62 |
+-------+-------+-----+
I think built-in function will give more performance as compared to udf.
Upvotes: 1
Reputation: 1014
You can do by using stack function too.
Here is an example code to try it out.
val df = Seq((1,40,60,10), (2,34,10,20), (3,87,29,62) ).toDF("cust_id","100x","200x","300x")
df.show()
scala> df.show()
+-------+----+----+----+
|cust_id|100x|200x|300x|
+-------+----+----+----+
| 1| 40| 60| 10|
| 2| 34| 10| 20|
| 3| 87| 29| 62|
+-------+----+----+----+
val skipColumn = "cust_id"
var columnCount = df.schema.size -1
df.columns
var columnsStr = ""
var counter = 0
for ( col <- df.columns ) {
counter = counter + 1
if(col != skipColumn) {
if(counter == df.schema.size) {
columnsStr = columnsStr + s"'$col', $col"
}
else {
columnsStr = columnsStr + s"'$col', $col,"
}
}
}
val unPivotDF = df.select($"cust_id",
expr(s"stack($columnCount, $columnsStr) as (Sid,Value)"))
unPivotDF.show()
scala> unPivotDF.show()
+-------+----+-----+
|cust_id| Sid|Value|
+-------+----+-----+
| 1|100x| 40|
| 1|200x| 60|
| 1|300x| 10|
| 2|100x| 34|
| 2|200x| 10|
| 2|300x| 20|
| 3|100x| 87|
| 3|200x| 29|
| 3|300x| 62|
+-------+----+-----+
Upvotes: 0
Reputation: 27373
I did a method some time ago to do this:
/**
* Transforms (reshapes) a dataframe by transforming columns into rows
*
* Note that the datatype of all columns to be transposed to rows must be the same!
*
* @param df The input dataframe
* @param remain The columns which should remain unchanged
* @param keyName The name of the new key-column
* @param valueName The name of the new value-column
* @return The transformed dataframe having (reamin.size + 2) columns
*/
def colsToRows(df: DataFrame, remain: Seq[String], keyName:String="key",valueName:String="value"): DataFrame = {
// cols: all columns to be transformed to rows
val (cols, types) = df.dtypes.filter{ case (c, _) => !remain.contains(c)}.unzip
assert(types.distinct.size == 1,s"All columns need to have same type, but found ${types.distinct}")
// make an array of the values in the columns and then explode it to generate rows
val kvs = explode(array(
cols.map(c => struct(lit(c).alias(keyName), col(c).alias(valueName))): _*
))
// columns which should remain
val byExprs = remain.map(col(_))
// construct final dataframe
df
.select(byExprs :+ kvs.alias("_kvs"): _*)
.select(byExprs ++ Seq(col(s"_kvs.$keyName"), col(s"_kvs.$valueName")): _*)
}
You can use it like this:
val df = Seq(
(1,40,60,10),
(2,34,10,20),
(3,87,29,62)
).toDF("cust_id","100x","200x","300x")
colsToRows(df,remain = Seq("cust_id"),keyName = "sid")
.show()
gives
+-------+----+-----+
|cust_id| sid|value|
+-------+----+-----+
| 1|100x| 40|
| 1|200x| 60|
| 1|300x| 10|
| 2|100x| 34|
| 2|200x| 10|
| 2|300x| 20|
| 3|100x| 87|
| 3|200x| 29|
| 3|300x| 62|
+-------+----+-----+
Upvotes: 0