Reputation: 53
I have a dataframe like this:
df = spark.createDataFrame(
[
(dt.datetime(2021, 5, 1, 10, 30, 0), 2.15, "a"),
(dt.datetime(2021, 5, 1, 10, 30, 10), 2.12, "a"),
(dt.datetime(2021, 5, 1, 10, 30, 20), 2.13, "a"),
(dt.datetime(2021, 5, 1, 10, 30, 50), 2.14, "a"),
(dt.datetime(2021, 5, 1, 10, 31, 5), 2.13, "a"),
(dt.datetime(2021, 5, 1, 10, 31, 10), 2.16, "a"),
(dt.datetime(2021, 5, 1, 10, 31, 10), 2.16, "b"),
],
["ts", "value", "group"]
)
I want to get the rank of the value column, using all the previous values (ordered by the timestamp ts). For example:
+-------------------+-----+-----+----+
| ts|value|group|rank|
+-------------------+-----+-----+----+
|2021-05-01 10:30:00| 2.15| a| 1|
|2021-05-01 10:30:10| 2.12| a| 1|
|2021-05-01 10:30:20| 2.13| a| 2|
|2021-05-01 10:30:50| 2.14| a| 3|
|2021-05-01 10:31:05| 2.13| a| 2|
|2021-05-01 10:31:10| 2.16| a| 5|
|2021-05-01 10:31:10| 2.16| b| 1|
+-------------------+-----+-----+----+
I tried the following code:
w = (
Window
.partitionBy("group")
.orderBy("ts")
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
)
df.select(
"*",
f.rank().over(w).alias("rank")
).show()
but is basically ranking the columns only on the timestamp.
Any idea how to do it?
Upvotes: 2
Views: 977
Reputation: 2939
rank
function ranks the data by orderBy
clause, so you cannot rank it by another column. You can use this as an alternative
df = (df
.withColumn("rank", F.array_sort(F.collect_set('value').over(w)))
.withColumn('rank', F.expr("array_position(rank, value)")))
df.show()
+-------------------+-----+-----+----+
| ts|value|group|rank|
+-------------------+-----+-----+----+
|2021-05-01 10:31:10| 2.16| b| 1|
|2021-05-01 10:30:00| 2.15| a| 1|
|2021-05-01 10:30:10| 2.12| a| 1|
|2021-05-01 10:30:20| 2.13| a| 2|
|2021-05-01 10:30:50| 2.14| a| 3|
|2021-05-01 10:31:05| 2.13| a| 2|
|2021-05-01 10:31:10| 2.16| a| 5|
+-------------------+-----+-----+----+
if you want to get dense_rank
, use collect_list
Upvotes: 2
Reputation: 2003
Chnage your orderBy()
column to value
import datetime as dt
df = spark.createDataFrame(
[
(dt.datetime(2021, 5, 1, 10, 30, 0), 2.15, "a"),
(dt.datetime(2021, 5, 1, 10, 30, 10), 2.12, "a"),
(dt.datetime(2021, 5, 1, 10, 30, 20), 2.13, "a"),
(dt.datetime(2021, 5, 1, 10, 30, 50), 2.14, "a"),
(dt.datetime(2021, 5, 1, 10, 31, 5), 2.13, "a"),
(dt.datetime(2021, 5, 1, 10, 31, 10), 2.16, "b"),
(dt.datetime(2021, 5, 1, 10, 31, 11), 2.17, "b"),
],
["ts", "value", "group"]
)
w = (
W
.partitionBy("group")
.orderBy("value")
)
df.select(
"*",
F.rank().over(w).alias("rank")
).show()
+-------------------+-----+-----+----+
| ts|value|group|rank|
+-------------------+-----+-----+----+
|2021-05-01 10:30:10| 2.12| a| 1|
|2021-05-01 10:30:20| 2.13| a| 2|
|2021-05-01 10:31:05| 2.13| a| 2|
|2021-05-01 10:30:50| 2.14| a| 4|
|2021-05-01 10:30:00| 2.15| a| 5|
|2021-05-01 10:31:10| 2.16| b| 1|
|2021-05-01 10:31:11| 2.17| b| 2|
+-------------------+-----+-----+----+
Upvotes: 0