Josh
Josh

Reputation: 107

Consider items of the same value when deciding rank

In spark, I would like to count how values are less or equal to other values. I tried to accomplish this via ranking but rank produces [1,2,2,2,3,4] -> [1,2,2,2,5,6] while what I would like is [1,2,2,2,3,4] -> [1,4,4,4,5,6]

I can accomplish this by ranking, grouping by the rank and then modifying the rank value based on how many items are in the group. But this is kind of clunky and it's inefficient. Is there a better way to do this?

Edit: Added minimal example of what I'm trying to accomplish

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.rank
import org.apache.spark.sql.expressions.Window


object Question extends App {
  val spark = SparkSession.builder.appName("Question").master("local[*]").getOrCreate()

  import spark.implicits._

  val win = Window.orderBy($"nums".asc)

  Seq(1, 2, 2, 2, 3, 4)
    .toDF("nums")
    .select($"nums", rank.over(win).alias("rank"))
    .as[(Int, Int)]
    .groupByKey(_._2)
    .mapGroups((rank, nums) => (rank, nums.toList.map(_._1)))
    .map(x => (x._1 + x._2.length - 1, x._2))
    .flatMap(x => x._2.map(num => (num, x._1)))
    .toDF("nums", "rank")
    .show(false)
}

Output:

+----+----+
|nums|rank|
+----+----+
|1   |1   |
|2   |4   |
|2   |4   |
|2   |4   |
|3   |5   |
|4   |6   |
+----+----+

Upvotes: 0

Views: 560

Answers (2)

stack0114106
stack0114106

Reputation: 8711

Use window functions

scala> val df =  Seq(1, 2, 2, 2, 3, 4).toDF("nums")
df: org.apache.spark.sql.DataFrame = [nums: int]

scala> df.createOrReplaceTempView("tbl")

scala> spark.sql(" with tab1(select nums, rank() over(order by nums) rk, count(*) over(partition by nums) cn from tbl) select nums, rk+cn-1 as rk2 from tab1 ").show(false)
18/11/28 02:20:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+----+---+
|nums|rk2|
+----+---+
|1   |1  |
|2   |4  |
|2   |4  |
|2   |4  |
|3   |5  |
|4   |6  |
+----+---+


scala>

Note that the df doesn't partition on any column, so spark complains of moving all data to single partition.

EDIT1:

scala> spark.sql(" select nums, rank() over(order by nums) + count(*) over(partition by nums) -1 as rk2 from tbl ").show
18/11/28 23:20:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+----+---+
|nums|rk2|
+----+---+
|   1|  1|
|   2|  4|
|   2|  4|
|   2|  4|
|   3|  5|
|   4|  6|
+----+---+


scala>

EDIT2:

The equivalent df version

scala> val df =  Seq(1, 2, 2, 2, 3, 4).toDF("nums")
df: org.apache.spark.sql.DataFrame = [nums: int]

scala> import org.apache.spark.sql.expressions._
import org.apache.spark.sql.expressions._

scala> df.withColumn("rk2", rank().over(Window orderBy 'nums)+ count(lit(1)).over(Window.partitionBy('nums)) - 1 ).show(false)
2018-12-01 11:10:26 WARN  WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+----+---+
|nums|rk2|
+----+---+
|1   |1  |
|2   |4  |
|2   |4  |
|2   |4  |
|3   |5  |
|4   |6  |
+----+---+


scala>

Upvotes: 2

Josh
Josh

Reputation: 107

So, a friend pointed out that if I just calculate the rank in descending order and then for each rank do (max_rank + 1) - current_rank. This is a much more efficient implementation.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.rank
import org.apache.spark.sql.expressions.Window


object Question extends App {
  val spark = SparkSession.builder.appName("Question").master("local[*]").getOrCreate()

  import spark.implicits._


  val win = Window.orderBy($"nums".desc)
  val rankings = Seq(1, 2, 2, 2, 3, 4)
    .toDF("nums")
    .select($"nums", rank.over(win).alias("rank"))
    .as[(Int, Int)]

  val maxElement = rankings.select("rank").as[Int].reduce((a, b) => if (a > b) a else b)

  rankings
    .map(x => x.copy(_2 = maxElement - x._2 + 1))
    .toDF("nums", "rank")
    .orderBy("rank")
    .show(false)
}

Output

+----+----+
|nums|rank|
+----+----+
|1   |1   |
|2   |4   |
|2   |4   |
|2   |4   |
|3   |5   |
|4   |6   |
+----+----+

Upvotes: 0

Related Questions