Manikandan Kannan
Manikandan Kannan

Reputation: 9024

spark table manipulation - Column values to rows and row values transposed

I have the following dataset

enter image description here

And i want to convert this to the following using spark. Any pointers would be helpful.

enter image description here

Upvotes: 0

Views: 373

Answers (3)

Mahesh Gupta
Mahesh Gupta

Reputation: 1892

spark 2.4.3 you can you map_from_array and it is pretty straight forward and inbuilt function.

scala> val df = Seq((1,40,60,10), (2,34,10,20), (3,87,29,62) ).toDF("cust_id","100x","200x","300x")

scala> df.show
+-------+----+----+----+
|cust_id|100x|200x|300x|
+-------+----+----+----+
|      1|  40|  60|  10|
|      2|  34|  10|  20|
|      3|  87|  29|  62|
+-------+----+----+----+

Apply map_from_array and explode it will give your desired result

df.select(array('*).as("v"), lit(df.columns).as("k")).select('v.getItem(0).as("cust_id"), map_from_arrays('k,'v).as("map")).select('cust_id, explode('map)).show(false)

+-------+-------+-----+
|cust_id|key    |value|
+-------+-------+-----+
|1      |cust_id|1    |
|1      |100x   |40   |
|1      |200x   |60   |
|1      |300x   |10   |
|2      |cust_id|2    |
|2      |100x   |34   |
|2      |200x   |10   |
|2      |300x   |20   |
|3      |cust_id|3    |
|3      |100x   |87   |
|3      |200x   |29   |
|3      |300x   |62   |
+-------+-------+-----+

I think built-in function will give more performance as compared to udf.

Upvotes: 1

Ramdev Sharma
Ramdev Sharma

Reputation: 1014

You can do by using stack function too.

Here is an example code to try it out.

val df = Seq((1,40,60,10), (2,34,10,20), (3,87,29,62) ).toDF("cust_id","100x","200x","300x")
df.show()
scala> df.show()
+-------+----+----+----+
|cust_id|100x|200x|300x|
+-------+----+----+----+
|      1|  40|  60|  10|
|      2|  34|  10|  20|
|      3|  87|  29|  62|
+-------+----+----+----+

val skipColumn = "cust_id"
var columnCount = df.schema.size -1
df.columns
var columnsStr = ""
var counter = 0
for ( col <- df.columns ) {
    counter = counter + 1
    if(col != skipColumn) {
        if(counter == df.schema.size) {
        columnsStr = columnsStr + s"'$col', $col"    
    }
    else {
        columnsStr = columnsStr + s"'$col', $col,"
    }
    }
}
val unPivotDF = df.select($"cust_id",
expr(s"stack($columnCount, $columnsStr) as (Sid,Value)"))
unPivotDF.show()

scala> unPivotDF.show()
+-------+----+-----+
|cust_id| Sid|Value|
+-------+----+-----+
|      1|100x|   40|
|      1|200x|   60|
|      1|300x|   10|
|      2|100x|   34|
|      2|200x|   10|
|      2|300x|   20|
|      3|100x|   87|
|      3|200x|   29|
|      3|300x|   62|
+-------+----+-----+

Upvotes: 0

Raphael Roth
Raphael Roth

Reputation: 27373

I did a method some time ago to do this:

/**
  * Transforms (reshapes) a dataframe by transforming columns into rows
  *
  * Note that the datatype of all columns to be transposed to rows must be the same!
  *
  * @param df The input dataframe
  * @param remain The columns which should remain unchanged
  * @param keyName The name of the new key-column
  * @param valueName The name of the new value-column
  * @return The transformed dataframe having (reamin.size + 2) columns
  */
def colsToRows(df: DataFrame, remain: Seq[String], keyName:String="key",valueName:String="value"): DataFrame = {
  // cols: all columns to be transformed to rows
  val (cols, types) = df.dtypes.filter{ case (c, _) => !remain.contains(c)}.unzip
  assert(types.distinct.size == 1,s"All columns need to have same type, but found ${types.distinct}")

  // make an array of the values in the columns and then explode it to generate rows
  val kvs = explode(array(
    cols.map(c => struct(lit(c).alias(keyName), col(c).alias(valueName))): _*
  ))

  // columns which should remain
  val byExprs = remain.map(col(_))

  // construct final dataframe
  df
    .select(byExprs :+ kvs.alias("_kvs"): _*)
    .select(byExprs ++ Seq(col(s"_kvs.$keyName"), col(s"_kvs.$valueName")): _*)
}

You can use it like this:

val df = Seq(
  (1,40,60,10),
  (2,34,10,20),
  (3,87,29,62)
).toDF("cust_id","100x","200x","300x")

colsToRows(df,remain = Seq("cust_id"),keyName = "sid")
.show()

gives

+-------+----+-----+
|cust_id| sid|value|
+-------+----+-----+
|      1|100x|   40|
|      1|200x|   60|
|      1|300x|   10|
|      2|100x|   34|
|      2|200x|   10|
|      2|300x|   20|
|      3|100x|   87|
|      3|200x|   29|
|      3|300x|   62|
+-------+----+-----+

Upvotes: 0

Related Questions