Reputation: 21310
I am new to Apache Spark. Below is my Spark dataframe
that is created when I read a csv file.
Parent Keyword Volume
P1 K1 100
P1 K2 200
P1 K3 150
P2 K4 100
P2 K5 200
I need to convert the above dataframe to the below one. The logic is for all the keywords that belong to the same parent are related and should be listed in the sorted order by Volume. As an example K1, K2, K3
belong to the same parent P1
so they all are related. So, for K1
, related keywords are K2
and K3
. K2
is displayed first because it's volume(200
) is greater than K3(150)
.
Keyword Related_keywords
K1 K2, K3
K2 K3, K1
K3 K2, K1
K4 K5
K5 K4
I am new to Spark and looking at this problem, groupBy
may be used but don't know exactly how to turn the first dataframe to second.
Upvotes: 1
Views: 144
Reputation: 4540
While this can be done using groupBy
, window functions are usually easier when you need all the original rows in the result data frame. We can use collect_list
but as the doc says, the order is nondeterministic so let's create tuples of volumes and keywords:
val txt =
"""Parent Keyword Volume
|P1 K1 100
|P1 K2 200
|P1 K3 150
|P2 K4 100
|P2 K5 200""".stripMargin.lines
.map(_.split("\\s+").mkString("|"))
.toSeq
.toDS()
val df = spark.read
.option("inferSchema", true)
.option("header", true)
.option("delimiter", "|")
.csv(txt)
val win = Window.partitionBy($"Parent")
val df1 =
df.select($"Keyword",
collect_list(struct(-$"Volume", $"Keyword")).over(win) as "rel")
Now we almost have the desired format
df1.select(array_sort($"rel") as "Related_keywords")
.show(20, false)
Output:
+------------------------------------+
|Related_keywords |
+------------------------------------+
|[[-200, K5], [-100, K4]] |
|[[-200, K5], [-100, K4]] |
|[[-200, K2], [-150, K3], [-100, K1]]|
|[[-200, K2], [-150, K3], [-100, K1]]|
|[[-200, K2], [-150, K3], [-100, K1]]|
+------------------------------------+
However, there are two issues, the original Keyword
would be duplicated in the list and there is the negative volume in front of all keywords. To make this prettier, I believe UDF:s are needed (could not find a SQL function for unzipping the tuples):
val myudf = udf(
(keyword: String, rel: Seq[Row]) =>
rel
.collect {
case Row(volume: Int, kw: String) if kw != keyword => (volume, kw)
}
.sortBy(_._1)
.map(_._2))
df1.select($"Keyword", myudf($"Keyword", $"rel") as "Related_keywords")
.show(20, false)
Output:
+-------+----------------+
|Keyword|Related_keywords|
+-------+----------------+
|K4 |[K5] |
|K5 |[K4] |
|K1 |[K2, K3] |
|K2 |[K3, K1] |
|K3 |[K2, K1] |
+-------+----------------+
Upvotes: 1