Reputation: 107
I have a dataframe which currently looks like this
|col_id|r_id_1|r_id_2|r_id_3|
| 1 | a1 | b1 | c1 |
| 1 | a2 | b2 | c2 |
| 2 | a3 | b3 | c3 |
| 2 | a4 | b4 | c4 |
I am looking to convert it in the form
|col_id|r_id_1|r_id_2|r_id_3|r_id_1|r_id_2|r_id_3|
| 1 | a1 | b1 | c1 | a2 | b2 | c2 |
| 2 | a3 | b3 | c3 | a4 | b4 | c4 |
Thus, there are 2 rows with column id 1 and they have now been grouped by col_id and should now generate new columns using existing rows. Note. The number of rows for each column id are the same.
Upvotes: 0
Views: 896
Reputation: 35249
This should do:
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._
val df = Seq(
(1, "a1", "b1", "c1"),
(1, "a2", "b2", "c2"),
(2, "a3", "b3", "c3"),
(2, "a4", "b4", "c4")
).toDF("col_id", "r_id_1", "r_id2", "r_id_3")
val cols = df.columns.tail
df
.withColumn("rn",
row_number().over(Window.partitionBy("col_id").orderBy("r_id_1")))
.flatMap { row => row.getValuesMap[String](cols).map {
case (c, t) => (row.getAs[Int]("col_id"), s"${c}_${row.getAs[Int]("rn")}", t) }}
.groupBy("_1")
.pivot("_2")
.agg(first("_3"))
.show
+---+-------+-------+--------+--------+--------+--------+
| _1|r_id2_1|r_id2_2|r_id_1_1|r_id_1_2|r_id_3_1|r_id_3_2|
+---+-------+-------+--------+--------+--------+--------+
| 1| b1| b2| a1| a2| c1| c2|
| 2| b3| b4| a3| a4| c3| c4|
+---+-------+-------+--------+--------+--------+--------+
Upvotes: 2