Reputation: 21
I am new to spark and working on huge dataset of size around 20GB (multiple small files) and need help in transforming this data in below format:
I have data in this format:
+----------+-------------------------+-------------------+---------+------+
| id | values | creation date | leadTime| span |
+----------+-------------------------+-------------------+---------+--+---+
|id_1 |[[v1, 0.368], [v2, 0.5]] | 2020-07-15 | 16 | 15 |
|id_2 |[[v1, 0.368], [v2, 0.4]] | 2020-07-15 | 16 | 15 |
|id_1 |[[v1, 0.468], [v2, 0.3]] | 2020-07-15 | 17 | 18 |
|id_2 |[[v1, 0.368], [v2, 0.3]] | 2020-07-15 | 17 | 18 |
+----------+-------------------------+-------------------+---------+------+
I need data in below format by using values from column fields:
creating new column with column name using leadTime and span column value
+----------+--------------+--------------------+--------------------+--------------------+--------------------+
| id |creation date | final_v1_16_15_wk | final_v2_16_15_wk |final_v1_17_18_wk | final_v2_17_18_wk |
+----------+--------------+--------------------+--------------------+--------------------+--------------------+
|id_1 |2020-07-15 | 0.368 | 0.5 | 0.468 | 0.3 |
|id_2 |2020-07-15 | 0.368 | 0.4 | 0.368 | 0.3 |
+----------+--------------+--------------------+--------------------+--------------------+--------------------+
Here is sample data frame:
val df = Seq(
("id_1", Map("v1" -> 0.368, "v2" -> 0.5, "v3" -> 0.6), "2020-07-15", 16, 15),
("id_1", Map("v1" -> 0.564, "v2" -> 0.78, "v3" -> 0.65), "2020-07-15", 17, 18),
("id_2", Map("v1" -> 0.468, "v2" -> 0.3, "v3" -> 0.66), "2020-07-15", 16, 15),
("id_2", Map("v1" -> 0.657, "v2" -> 0.65, "v3" -> 0.67), "2020-07-15", 17, 18)).toDF("id", "values", "creation date", "leadTime", "span")
Tried to generate column name/value using below logic but it did not work:
val modDF = finalDF.withColumn("final_" + newFinalDF("values").getItem(0).getItem("_1") + "_" + newFinalDF("leadTime") + "_" + newFinalDF("span") + "_wk", $"values".getItem(0).getItem("_2"));
Upvotes: 0
Views: 161
Reputation: 36
Pivot can be used for this.
import org.apache.spark.sql.functions._
val explodeDf=df.select(col("id"),col("creation date"),explode_outer(col("values")),col("leadTime"),col("span"))
val finalDf=explodeDf.select(col("id"),col("creation date"),col("value"),concat(lit("final_"),col("key"),lit("_"),col("leadTime"),lit("_"),col("span"),lit("_wk")).as("colDerived"))
finalDf.groupBy(col("id"),col("creation date")).pivot(col("colDerived")).agg(sum(col("value"))).show()
+----+-------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
| id|creation date|final_v1_16_15_wk|final_v1_17_18_wk|final_v2_16_15_wk|final_v2_17_18_wk|final_v3_16_15_wk|final_v3_17_18_wk|
+----+-------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|id_1| 2020-07-15| 0.368| 0.564| 0.5| 0.78| 0.6| 0.65|
|id_2| 2020-07-15| 0.468| 0.657| 0.3| 0.65| 0.66| 0.67|
+----+-------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
Upvotes: 2
Reputation: 6568
You can use pivot after doing the necessary value change to fit as column header. Pivot will have better performance if you provide the list of columns upfront, otherwise Spark is going to run a distinct on the column.
import org.apache.spark.sql.functions._
val df = Seq(
("id_1", Map("v1" -> 0.368, "v2" -> 0.5, "v3" -> 0.6), "2020-07-15", 16, 15),
("id_1", Map("v1" -> 0.564, "v2" -> 0.78, "v3" -> 0.65), "2020-07-15", 17, 18),
("id_2", Map("v1" -> 0.468, "v2" -> 0.3, "v3" -> 0.66), "2020-07-15", 16, 15),
("id_2", Map("v1" -> 0.657, "v2" -> 0.65, "v3" -> 0.67), "2020-07-15", 17, 18)).toDF("id", "values", "creation date", "leadTime", "span")
val df2 = df.select($"id",explode_outer($"values"),$"creation date", $"leadTime", $"span")
.withColumn("keys", concat(lit("final_"), col("key")))
.withColumn("leadTimes", concat(lit("_"), col("leadTime"),lit("_")))
.withColumn("spans", concat(col("span"),lit("_wk")))
.drop("leadTime","key","span")
.withColumnRenamed("keys","key").withColumnRenamed("leadTimes","leadTime").withColumnRenamed("spans","span")
val df3 = df2.groupBy($"id",$"creation date").pivot(concat($"key",$"leadTime",$"span")).agg(first("value"))
df3.show()
Output
+----+-------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
| id|creation date|final_v1_16_15_wk|final_v1_17_18_wk|final_v2_16_15_wk|final_v2_17_18_wk|final_v3_16_15_wk|final_v3_17_18_wk|
+----+-------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|id_1| 2020-07-15| 0.368| 0.564| 0.5| 0.78| 0.6| 0.65|
|id_2| 2020-07-15| 0.468| 0.657| 0.3| 0.65| 0.66| 0.67|
+----+-------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html
Upvotes: 0