Anand
Anand

Reputation: 21310

Grouping in Apache Spark dataframe

I am new to Apache Spark. Below is my Spark dataframe that is created when I read a csv file.

Parent  Keyword   Volume
P1       K1        100
P1       K2        200
P1       K3        150
P2       K4        100
P2       K5        200

I need to convert the above dataframe to the below one. The logic is for all the keywords that belong to the same parent are related and should be listed in the sorted order by Volume. As an example K1, K2, K3 belong to the same parent P1 so they all are related. So, for K1, related keywords are K2 and K3. K2 is displayed first because it's volume(200) is greater than K3(150).

Keyword   Related_keywords
K1         K2, K3
K2         K3, K1
K3         K2, K1
K4         K5
K5         K4

I am new to Spark and looking at this problem, groupBy may be used but don't know exactly how to turn the first dataframe to second.

Upvotes: 1

Views: 144

Answers (1)

ollik1
ollik1

Reputation: 4540

While this can be done using groupBy, window functions are usually easier when you need all the original rows in the result data frame. We can use collect_list but as the doc says, the order is nondeterministic so let's create tuples of volumes and keywords:

val txt =
  """Parent  Keyword   Volume
    |P1       K1        100
    |P1       K2        200
    |P1       K3        150
    |P2       K4        100
    |P2       K5        200""".stripMargin.lines
    .map(_.split("\\s+").mkString("|"))
    .toSeq
    .toDS()
val df = spark.read
  .option("inferSchema", true)
  .option("header", true)
  .option("delimiter", "|")
  .csv(txt)

val win = Window.partitionBy($"Parent")
val df1 =
  df.select($"Keyword",
            collect_list(struct(-$"Volume", $"Keyword")).over(win) as "rel")

Now we almost have the desired format

df1.select(array_sort($"rel") as "Related_keywords")
  .show(20, false)

Output:

+------------------------------------+
|Related_keywords                    |
+------------------------------------+
|[[-200, K5], [-100, K4]]            |
|[[-200, K5], [-100, K4]]            |
|[[-200, K2], [-150, K3], [-100, K1]]|
|[[-200, K2], [-150, K3], [-100, K1]]|
|[[-200, K2], [-150, K3], [-100, K1]]|
+------------------------------------+

However, there are two issues, the original Keyword would be duplicated in the list and there is the negative volume in front of all keywords. To make this prettier, I believe UDF:s are needed (could not find a SQL function for unzipping the tuples):

val myudf = udf(
  (keyword: String, rel: Seq[Row]) =>
    rel
      .collect {
        case Row(volume: Int, kw: String) if kw != keyword => (volume, kw)
      }
      .sortBy(_._1)
      .map(_._2))

df1.select($"Keyword", myudf($"Keyword", $"rel") as "Related_keywords")
  .show(20, false)

Output:

+-------+----------------+
|Keyword|Related_keywords|
+-------+----------------+
|K4     |[K5]            |
|K5     |[K4]            |
|K1     |[K2, K3]        |
|K2     |[K3, K1]        |
|K3     |[K2, K1]        |
+-------+----------------+

Upvotes: 1

Related Questions