kodi1911
kodi1911

Reputation: 722

Spark Scala groupBy multiple columns with values

I have a following data frame (df) in spark

| group_1 | group_2 | year | value |
| "School1" | "Student" | 2018 | name_aaa |
| "School1" | "Student" | 2018 | name_bbb |
| "School1" | "Student" | 2019 | name_aaa |
| "School2" | "Student" | 2019 | name_aaa |

What I want to have is

| group_1 | group_2 | values_map |
| "School1" | "Student" | [2018 -> [name_aaa, name_bbb], [2019 -> [name_aaa] |
| "School2" | "Student" | [2019 -> [name_aaa] |

I tried it with groupBy and collect_list() & map() but it didn't work. It created a map with only last value from name_aaa or name_bbb. How can I achieve that with Apache Spark?

Upvotes: 0

Views: 4847

Answers (2)

Lamanus
Lamanus

Reputation: 13541

The result of the other answer is an array type not a map. Here is the way to achieve the map type column for your result.

df.groupBy("group_1", "group_2", "year").agg(collect_list("value").as("value_list"))
  .groupBy("group_1", "group_2").agg(collect_list(struct(col("year"), col("value_list"))).as("map_list"))
  .withColumn("values_map", map_from_entries(col("map_list")))
  .drop("map_list")
  .show(false)

I haven't used an udf. Then, the result directly shows your expected one.

+-------+-------+--------------------------------------------------+
|group_1|group_2|values_map                                        |
+-------+-------+--------------------------------------------------+
|School2|Student|[2019 -> [name_aaa]]                              |
|School1|Student|[2018 -> [name_aaa, name_bbb], 2019 -> [name_aaa]]|
+-------+-------+--------------------------------------------------+

Upvotes: 5

Anand Sai
Anand Sai

Reputation: 1586

The solution could be:

scala> df1.show
+-------+-------+----+--------+
|group_1|group_2|year|   value|
+-------+-------+----+--------+
|school1|student|2018|name_aaa|
|school1|student|2018|name_bbb|
|school1|student|2019|name_aaa|
|school2|student|2019|name_aaa|
+-------+-------+----+--------+


scala> val df2 = df1.groupBy("group_1","group_2","year").agg(collect_list('value).as("value"))
df2: org.apache.spark.sql.DataFrame = [group_1: string, group_2: string ... 2 more fields]

scala> df2.show
+-------+-------+----+--------------------+
|group_1|group_2|year|               value|
+-------+-------+----+--------------------+
|school1|student|2018|[name_aaa, name_bbb]|
|school1|student|2019|          [name_aaa]|
|school2|student|2019|          [name_aaa]|
+-------+-------+----+--------------------+


scala> val myUdf = udf((year: String, values: Seq[String]) => Map(year -> values))
myUdf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,MapType(StringType,ArrayType(StringType,true),true),Some(List(StringType, ArrayType(StringType,true))))

scala> val df3 = df2.withColumn("values",myUdf($"year",$"value")).drop("year","value")
df3: org.apache.spark.sql.DataFrame = [group_1: string, group_2: string ... 1 more field]
scala> val df4 = df3.groupBy("group_1","group_2").agg(collect_list("values").as("value_map"))
df4: org.apache.spark.sql.DataFrame = [group_1: string, group_2: string ... 1 more field]

scala> df4.printSchema
root
 |-- group_1: string (nullable = true)
 |-- group_2: string (nullable = true)
 |-- value_map: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: array (valueContainsNull = true)
 |    |    |    |-- element: string (containsNull = true)


scala> df4.show(false)
+-------+-------+------------------------------------------------------+
|group_1|group_2|value_map                                             |
+-------+-------+------------------------------------------------------+
|school1|student|[[2018 -> [name_aaa, name_bbb]], [2019 -> [name_aaa]]]|
|school2|student|[[2019 -> [name_aaa]]]                                |
+-------+-------+------------------------------------------------------+

Let me know if it helps!!

Upvotes: 2

Related Questions