Reputation: 91
I have a dataframe which has fix columns as m1_amt to m4_amt, containing data in the format below:
+------+----------+----------+----------+-----------+
|Entity| m1_amt | m2_amt | m3_amt | m4_amt |
+------+----------+----------+----------+-----------+
| ISO | 1 | 2 | 3 | 4 |
| TEST | 5 | 6 | 7 | 8 |
| Beta | 9 | 10 | 11 | 12 |
+------+----------+----------+----------+-----------+
I am trying to convert each new row into a new column as:
+----------+-------+--------+------+
| Entity | ISO | TEST | Beta |
+----------+-------+--------+------+
| m1_amt | 1 | 5 | 9 |
| m2_amt | 2 | 6 | 10 |
| m3_amt | 3 | 7 | 11 |
| m4_amt | 4 | 8 | 12 |
+----------+-------+--------+------+
How can I achieve this in Spark and Scala?
Upvotes: 1
Views: 122
Reputation: 91
I had tried in below way:
scala> val df=Seq(("ISO",1,2,3,4),
| ("TEST",5,6,7,8),
| ("Beta",9,10,11,12)).toDF("Entity","m1_amt","m2_amt","m3_amt","m4_amt")
df: org.apache.spark.sql.DataFrame = [Entity: string, m1_amt: int ... 3 more fields]
scala> df.show
+------+------+------+------+------+
|Entity|m1_amt|m2_amt|m3_amt|m4_amt|
+------+------+------+------+------+
| ISO| 1| 2| 3| 4|
| TEST| 5| 6| 7| 8|
| Beta| 9| 10| 11| 12|
+------+------+------+------+------+
scala> val selectDf= df.selectExpr("Entity","stack(4,'m1_amt',m1_amt,'m2_amt',m2_amt,'m3_amt',m3_amt,'m4_amt',m4_amt)")
selectDf: org.apache.spark.sql.DataFrame = [Entity: string, col0: string ... 1 more field]
scala> selectDf.show
+------+------+----+
|Entity| col0|col1|
+------+------+----+
| ISO|m1_amt| 1|
| ISO|m2_amt| 2|
| ISO|m3_amt| 3|
| ISO|m4_amt| 4|
| TEST|m1_amt| 5|
| TEST|m2_amt| 6|
| TEST|m3_amt| 7|
| TEST|m4_amt| 8|
| Beta|m1_amt| 9|
| Beta|m2_amt| 10|
| Beta|m3_amt| 11|
| Beta|m4_amt| 12|
+------+------+----+
scala> selectDf.groupBy("col0").pivot("Entity").agg(concat_ws("",collect_list(col("col1")))).withColumnRenamed("col0","Entity").show
+------+----+---+----+
|Entity|Beta|ISO|TEST|
+------+----+---+----+
|m3_amt| 11| 3| 7|
|m4_amt| 12| 4| 8|
|m2_amt| 10| 2| 6|
|m1_amt| 9| 1| 5|
+------+----+---+----+
Upvotes: 2
Reputation: 2431
scala> df.show
+------+------+------+------+------+
|Entity|m1_amt|m2_amt|m3_amt|m4_amt|
+------+------+------+------+------+
| ISO| 1| 2| 3| 4|
| TEST| 5| 6| 7| 8|
| Beta| 9| 10| 11| 12|
+------+------+------+------+------+
scala> val df1 = df.withColumn("amt", to_json(struct(col("m1_amt"),col("m2_amt"),col("m3_amt"),col("m4_amt"))))
.withColumn("amt", regexp_replace(col("amt"), """[\\{\\"\\}]""", ""))
.withColumn("amt", explode(split(col("amt"), ",")))
.withColumn("cols", split(col("amt"), ":")(0))
.withColumn("val", split(col("amt"), ":")(1))
.select("Entity","cols","val")
scala> df1.show
+------+------+---+
|Entity| cols|val|
+------+------+---+
| ISO|m1_amt| 1|
| ISO|m2_amt| 2|
| ISO|m3_amt| 3|
| ISO|m4_amt| 4|
| TEST|m1_amt| 5|
| TEST|m2_amt| 6|
| TEST|m3_amt| 7|
| TEST|m4_amt| 8|
| Beta|m1_amt| 9|
| Beta|m2_amt| 10|
| Beta|m3_amt| 11|
| Beta|m4_amt| 12|
+------+------+---+
scala> df1.groupBy(col("cols")).pivot("Entity").agg(concat_ws("",collect_set(col("val"))))
.withColumnRenamed("cols", "Entity")
.show()
+------+----+---+----+
|Entity|Beta|ISO|TEST|
+------+----+---+----+
|m3_amt| 11| 3| 7|
|m4_amt| 12| 4| 8|
|m2_amt| 10| 2| 6|
|m1_amt| 9| 1| 5|
+------+----+---+----+
Upvotes: 1