Adheip Nagarajan
Adheip Nagarajan

Reputation: 107

Spark - Appending multiple rows to create columns for a common column id

I have a dataframe which currently looks like this

|col_id|r_id_1|r_id_2|r_id_3|
|   1  |  a1  |  b1  |  c1  |
|   1  |  a2  |  b2  |  c2  |
|   2  |  a3  |  b3  |  c3  |
|   2  |  a4  |  b4  |  c4  |

I am looking to convert it in the form

|col_id|r_id_1|r_id_2|r_id_3|r_id_1|r_id_2|r_id_3|
|   1  |  a1  |  b1  |  c1  |  a2  |  b2  |  c2  |
|   2  |  a3  |  b3  |  c3  |  a4  |  b4  |  c4  |

Thus, there are 2 rows with column id 1 and they have now been grouped by col_id and should now generate new columns using existing rows. Note. The number of rows for each column id are the same.

Upvotes: 0

Views: 896

Answers (1)

Alper t. Turker
Alper t. Turker

Reputation: 35249

This should do:

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._

val df = Seq(
  (1, "a1", "b1", "c1"),
  (1, "a2", "b2", "c2"),
  (2, "a3", "b3", "c3"),
  (2, "a4", "b4", "c4")
).toDF("col_id", "r_id_1", "r_id2", "r_id_3")

val cols = df.columns.tail

df
  .withColumn("rn", 
    row_number().over(Window.partitionBy("col_id").orderBy("r_id_1")))
 .flatMap { row => row.getValuesMap[String](cols).map { 
   case (c, t) => (row.getAs[Int]("col_id"), s"${c}_${row.getAs[Int]("rn")}", t) }}
 .groupBy("_1")
 .pivot("_2")
 .agg(first("_3"))
 .show

+---+-------+-------+--------+--------+--------+--------+                       
| _1|r_id2_1|r_id2_2|r_id_1_1|r_id_1_2|r_id_3_1|r_id_3_2|
+---+-------+-------+--------+--------+--------+--------+
|  1|     b1|     b2|      a1|      a2|      c1|      c2|
|  2|     b3|     b4|      a3|      a4|      c3|      c4|
+---+-------+-------+--------+--------+--------+--------+

Upvotes: 2

Related Questions