mkl
mkl

Reputation: 21

Spark large data frame add new columns based on other column values

I am new to spark and working on huge dataset of size around 20GB (multiple small files) and need help in transforming this data in below format:

I have data in this format:

+----------+-------------------------+-------------------+---------+------+
|   id     |       values            |     creation date | leadTime| span |
+----------+-------------------------+-------------------+---------+--+---+
|id_1      |[[v1, 0.368], [v2, 0.5]] |     2020-07-15    |      16 |  15  |
|id_2      |[[v1, 0.368], [v2, 0.4]] |     2020-07-15    |      16 |  15  |
|id_1      |[[v1, 0.468], [v2, 0.3]] |     2020-07-15    |      17 |  18  |
|id_2      |[[v1, 0.368], [v2, 0.3]] |     2020-07-15    |      17 |  18  | 
+----------+-------------------------+-------------------+---------+------+

I need data in below format by using values from column fields:

creating new column with column name using leadTime and span column value

+----------+--------------+--------------------+--------------------+--------------------+--------------------+
|   id     |creation date | final_v1_16_15_wk  |  final_v2_16_15_wk |final_v1_17_18_wk  |  final_v2_17_18_wk  |
+----------+--------------+--------------------+--------------------+--------------------+--------------------+
|id_1      |2020-07-15    |       0.368        |         0.5        |       0.468        |         0.3        |
|id_2      |2020-07-15    |       0.368        |         0.4        |       0.368        |         0.3        |
+----------+--------------+--------------------+--------------------+--------------------+--------------------+

Here is sample data frame:

val df = Seq(
  ("id_1", Map("v1" -> 0.368, "v2" -> 0.5, "v3" -> 0.6), "2020-07-15", 16, 15),
  ("id_1", Map("v1" -> 0.564, "v2" -> 0.78, "v3" -> 0.65), "2020-07-15", 17, 18),
  ("id_2", Map("v1" -> 0.468, "v2" -> 0.3, "v3" -> 0.66), "2020-07-15", 16, 15),
  ("id_2", Map("v1" -> 0.657, "v2" -> 0.65, "v3" -> 0.67), "2020-07-15", 17, 18)).toDF("id", "values", "creation date", "leadTime", "span")

Tried to generate column name/value using below logic but it did not work:

val modDF = finalDF.withColumn("final_" + newFinalDF("values").getItem(0).getItem("_1") + "_" + newFinalDF("leadTime") + "_" + newFinalDF("span") + "_wk", $"values".getItem(0).getItem("_2"));

Upvotes: 0

Views: 161

Answers (2)

dataenthusiast
dataenthusiast

Reputation: 36

Pivot can be used for this.

import org.apache.spark.sql.functions._
val explodeDf=df.select(col("id"),col("creation date"),explode_outer(col("values")),col("leadTime"),col("span"))
val finalDf=explodeDf.select(col("id"),col("creation date"),col("value"),concat(lit("final_"),col("key"),lit("_"),col("leadTime"),lit("_"),col("span"),lit("_wk")).as("colDerived"))
finalDf.groupBy(col("id"),col("creation date")).pivot(col("colDerived")).agg(sum(col("value"))).show()

   +----+-------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  id|creation date|final_v1_16_15_wk|final_v1_17_18_wk|final_v2_16_15_wk|final_v2_17_18_wk|final_v3_16_15_wk|final_v3_17_18_wk|
+----+-------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|id_1|   2020-07-15|            0.368|            0.564|              0.5|             0.78|              0.6|             0.65|
|id_2|   2020-07-15|            0.468|            0.657|              0.3|             0.65|             0.66|             0.67|
+----+-------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+

How to pivot Spark DataFrame?

Upvotes: 2

Abhi
Abhi

Reputation: 6568

You can use pivot after doing the necessary value change to fit as column header. Pivot will have better performance if you provide the list of columns upfront, otherwise Spark is going to run a distinct on the column.

import org.apache.spark.sql.functions._
val df = Seq(
  ("id_1", Map("v1" -> 0.368, "v2" -> 0.5, "v3" -> 0.6), "2020-07-15", 16, 15),
  ("id_1", Map("v1" -> 0.564, "v2" -> 0.78, "v3" -> 0.65), "2020-07-15", 17, 18),
  ("id_2", Map("v1" -> 0.468, "v2" -> 0.3, "v3" -> 0.66), "2020-07-15", 16, 15),
  ("id_2", Map("v1" -> 0.657, "v2" -> 0.65, "v3" -> 0.67), "2020-07-15", 17, 18)).toDF("id", "values", "creation date", "leadTime", "span")

val df2 = df.select($"id",explode_outer($"values"),$"creation date", $"leadTime", $"span")
.withColumn("keys", concat(lit("final_"), col("key")))
.withColumn("leadTimes", concat(lit("_"), col("leadTime"),lit("_")))
.withColumn("spans", concat(col("span"),lit("_wk")))
.drop("leadTime","key","span")
.withColumnRenamed("keys","key").withColumnRenamed("leadTimes","leadTime").withColumnRenamed("spans","span")

val df3 = df2.groupBy($"id",$"creation date").pivot(concat($"key",$"leadTime",$"span")).agg(first("value"))
df3.show()

Output

+----+-------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  id|creation date|final_v1_16_15_wk|final_v1_17_18_wk|final_v2_16_15_wk|final_v2_17_18_wk|final_v3_16_15_wk|final_v3_17_18_wk|
+----+-------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|id_1|   2020-07-15|            0.368|            0.564|              0.5|             0.78|              0.6|             0.65|
|id_2|   2020-07-15|            0.468|            0.657|              0.3|             0.65|             0.66|             0.67|
+----+-------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+

https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html

Upvotes: 0

Related Questions