Reputation: 722
I have a following data frame (df
) in spark
| group_1 | group_2 | year | value |
| "School1" | "Student" | 2018 | name_aaa |
| "School1" | "Student" | 2018 | name_bbb |
| "School1" | "Student" | 2019 | name_aaa |
| "School2" | "Student" | 2019 | name_aaa |
What I want to have is
| group_1 | group_2 | values_map |
| "School1" | "Student" | [2018 -> [name_aaa, name_bbb], [2019 -> [name_aaa] |
| "School2" | "Student" | [2019 -> [name_aaa] |
I tried it with groupBy
and collect_list()
& map()
but it didn't work. It created a map with only last value from name_aaa
or name_bbb
. How can I achieve that with Apache Spark?
Upvotes: 0
Views: 4847
Reputation: 13541
The result of the other answer is an array type not a map. Here is the way to achieve the map
type column for your result.
df.groupBy("group_1", "group_2", "year").agg(collect_list("value").as("value_list"))
.groupBy("group_1", "group_2").agg(collect_list(struct(col("year"), col("value_list"))).as("map_list"))
.withColumn("values_map", map_from_entries(col("map_list")))
.drop("map_list")
.show(false)
I haven't used an udf
. Then, the result directly shows your expected one.
+-------+-------+--------------------------------------------------+
|group_1|group_2|values_map |
+-------+-------+--------------------------------------------------+
|School2|Student|[2019 -> [name_aaa]] |
|School1|Student|[2018 -> [name_aaa, name_bbb], 2019 -> [name_aaa]]|
+-------+-------+--------------------------------------------------+
Upvotes: 5
Reputation: 1586
The solution could be:
scala> df1.show
+-------+-------+----+--------+
|group_1|group_2|year| value|
+-------+-------+----+--------+
|school1|student|2018|name_aaa|
|school1|student|2018|name_bbb|
|school1|student|2019|name_aaa|
|school2|student|2019|name_aaa|
+-------+-------+----+--------+
scala> val df2 = df1.groupBy("group_1","group_2","year").agg(collect_list('value).as("value"))
df2: org.apache.spark.sql.DataFrame = [group_1: string, group_2: string ... 2 more fields]
scala> df2.show
+-------+-------+----+--------------------+
|group_1|group_2|year| value|
+-------+-------+----+--------------------+
|school1|student|2018|[name_aaa, name_bbb]|
|school1|student|2019| [name_aaa]|
|school2|student|2019| [name_aaa]|
+-------+-------+----+--------------------+
scala> val myUdf = udf((year: String, values: Seq[String]) => Map(year -> values))
myUdf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,MapType(StringType,ArrayType(StringType,true),true),Some(List(StringType, ArrayType(StringType,true))))
scala> val df3 = df2.withColumn("values",myUdf($"year",$"value")).drop("year","value")
df3: org.apache.spark.sql.DataFrame = [group_1: string, group_2: string ... 1 more field]
scala> val df4 = df3.groupBy("group_1","group_2").agg(collect_list("values").as("value_map"))
df4: org.apache.spark.sql.DataFrame = [group_1: string, group_2: string ... 1 more field]
scala> df4.printSchema
root
|-- group_1: string (nullable = true)
|-- group_2: string (nullable = true)
|-- value_map: array (nullable = true)
| |-- element: map (containsNull = true)
| | |-- key: string
| | |-- value: array (valueContainsNull = true)
| | | |-- element: string (containsNull = true)
scala> df4.show(false)
+-------+-------+------------------------------------------------------+
|group_1|group_2|value_map |
+-------+-------+------------------------------------------------------+
|school1|student|[[2018 -> [name_aaa, name_bbb]], [2019 -> [name_aaa]]]|
|school2|student|[[2019 -> [name_aaa]]] |
+-------+-------+------------------------------------------------------+
Let me know if it helps!!
Upvotes: 2