Reputation: 25
I want to convert this into the form that looks like this (E-> W.P Kinsella, Iseult Teran, Nic Rowley)(A->Jason Shinder,Bernie.....). And then load this format into HDFS. Please suggest.
Upvotes: 0
Views: 87
Reputation: 10382
Check below code.
scala>
df
.groupBy($"Age_Group")
.agg(collect_set($"Book_Author").as("Book_Author"))
.select(map($"Age_Group",$"Book_Author").as("data"))
.show(false)
+--------------------------------------------------+
|data |
+--------------------------------------------------+
|[E -> [Iseult Teran, W.P.Kinsella, W. P Kinsella]]|
|[A -> [Jason Shinder, C.S Lewis]] |
+--------------------------------------------------+
Write to HDFS
df
.groupBy($"Age_Group")
.agg(collect_set($"Book_Author").as("Book_Author"))
.select(map($"Age_Group",$"Book_Author").as("data"))
.write
.format("orc")
.save("<hdfs_path>")
Removing Duplicates
For example below Book_Author names are same.
Above names almost similar below code removes duplicates & keeps only W.P.Kinsella
scala> import org.apache.spark.sql.expressions._
import org.apache.spark.sql.expressions._
scala> df.show(false)
+---------+-------------+
|Age_Group|Book_Author |
+---------+-------------+
|E |W.P.Kinsella |
|E |W. P Kinsella|
|E |Iseult Teran |
|A |C.S Lewis |
|A |Jason Shinder|
+---------+-------------+
scala> val windowExpr = first($"Book_Author")
.over(
Window
.partitionBy(lower(regexp_replace($"Book_Author","[ .]","")))
.orderBy($"Age_Group".asc)
)
scala> df
.withColumn("Book_Author",windowExpr)
.groupBy($"Age_Group")
.agg(collect_set($"Book_Author").as("Book_Author"))
.select(map($"Age_Group",$"Book_Author").as("data"))
.show(false)
+-----------------------------------+
|data |
+-----------------------------------+
|[E -> [Iseult Teran, W.P.Kinsella]]|
|[A -> [Jason Shinder, C.S Lewis]] |
+-----------------------------------+
Upvotes: 1