Michele Vascellari
Michele Vascellari

Reputation: 53

Rank values in Spark on a column based on previous values

I have a dataframe like this:

df = spark.createDataFrame(
    [
        (dt.datetime(2021, 5, 1, 10, 30, 0), 2.15, "a"),
        (dt.datetime(2021, 5, 1, 10, 30, 10), 2.12, "a"),
        (dt.datetime(2021, 5, 1, 10, 30, 20), 2.13, "a"),
        (dt.datetime(2021, 5, 1, 10, 30, 50), 2.14, "a"),
        (dt.datetime(2021, 5, 1, 10, 31, 5), 2.13, "a"),
        (dt.datetime(2021, 5, 1, 10, 31, 10), 2.16, "a"),
        (dt.datetime(2021, 5, 1, 10, 31, 10), 2.16, "b"),
    ],
    ["ts", "value", "group"]
)

I want to get the rank of the value column, using all the previous values (ordered by the timestamp ts). For example:

+-------------------+-----+-----+----+
|                 ts|value|group|rank|
+-------------------+-----+-----+----+
|2021-05-01 10:30:00| 2.15|    a|   1|
|2021-05-01 10:30:10| 2.12|    a|   1|
|2021-05-01 10:30:20| 2.13|    a|   2|
|2021-05-01 10:30:50| 2.14|    a|   3|
|2021-05-01 10:31:05| 2.13|    a|   2|
|2021-05-01 10:31:10| 2.16|    a|   5|
|2021-05-01 10:31:10| 2.16|    b|   1|
+-------------------+-----+-----+----+

I tried the following code:

w = (
    Window
    .partitionBy("group")
    .orderBy("ts")
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
)
df.select(
    "*", 
    f.rank().over(w).alias("rank")
).show()

but is basically ranking the columns only on the timestamp.

Any idea how to do it?

Upvotes: 2

Views: 977

Answers (2)

AdibP
AdibP

Reputation: 2939

rank function ranks the data by orderBy clause, so you cannot rank it by another column. You can use this as an alternative

df = (df
      .withColumn("rank", F.array_sort(F.collect_set('value').over(w)))
      .withColumn('rank', F.expr("array_position(rank, value)")))
df.show()

+-------------------+-----+-----+----+
|                 ts|value|group|rank|
+-------------------+-----+-----+----+
|2021-05-01 10:31:10| 2.16|    b|   1|
|2021-05-01 10:30:00| 2.15|    a|   1|
|2021-05-01 10:30:10| 2.12|    a|   1|
|2021-05-01 10:30:20| 2.13|    a|   2|
|2021-05-01 10:30:50| 2.14|    a|   3|
|2021-05-01 10:31:05| 2.13|    a|   2|
|2021-05-01 10:31:10| 2.16|    a|   5|
+-------------------+-----+-----+----+

if you want to get dense_rank, use collect_list

Upvotes: 2

dsk
dsk

Reputation: 2003

Chnage your orderBy() column to value

import datetime as dt
df = spark.createDataFrame(
    [
        (dt.datetime(2021, 5, 1, 10, 30, 0), 2.15, "a"),
        (dt.datetime(2021, 5, 1, 10, 30, 10), 2.12, "a"),
        (dt.datetime(2021, 5, 1, 10, 30, 20), 2.13, "a"),
        (dt.datetime(2021, 5, 1, 10, 30, 50), 2.14, "a"),
        (dt.datetime(2021, 5, 1, 10, 31, 5), 2.13, "a"),
        (dt.datetime(2021, 5, 1, 10, 31, 10), 2.16, "b"),
        (dt.datetime(2021, 5, 1, 10, 31, 11), 2.17, "b"),
    ],
    ["ts", "value", "group"]
)
w = (
    W
    .partitionBy("group")
    .orderBy("value")
)
df.select(
    "*", 
    F.rank().over(w).alias("rank")
).show()

+-------------------+-----+-----+----+
|                 ts|value|group|rank|
+-------------------+-----+-----+----+
|2021-05-01 10:30:10| 2.12|    a|   1|
|2021-05-01 10:30:20| 2.13|    a|   2|
|2021-05-01 10:31:05| 2.13|    a|   2|
|2021-05-01 10:30:50| 2.14|    a|   4|
|2021-05-01 10:30:00| 2.15|    a|   5|
|2021-05-01 10:31:10| 2.16|    b|   1|
|2021-05-01 10:31:11| 2.17|    b|   2|
+-------------------+-----+-----+----+

Upvotes: 0

Related Questions