BHC
BHC

Reputation: 77

How to get the maximum row_number in a window in a Spark dataframe

I have a dataframe that looks as below, and I'm using the below mentioned code the get it

+---+----------+--------+----------+
|EK |Value     |date    |row_number|
+---+----------+--------+----------+
|5  |100       |1/1/2020|1         |
|5  |150       |1/3/2020|2         |
|5  |175       |1/5/2020|3         |
|62 |200       |2/9/1999|1         |
|62 |21        |9/2/2000|2         | 
+---+----------+--------+----------+

window = Window.partitionBy("EK").orderBy("date")

df.withColumn("row_number",row_number().over(window))

The expected result is to get the maximum row number in every window as shown below,

+---+----------+--------+----------+
|EK |Value     |date    |row_number|
+---+----------+--------+----------+
|5  |175       |1/5/2020|3         |
|62 |21        |9/2/2000|2         | 
+---+----------+--------+----------+

Thank you.

Upvotes: 0

Views: 3418

Answers (2)

Lamanus
Lamanus

Reputation: 13541

You can use one more window to get the last value.

from pyspark.sql import functions as f
from pyspark.sql import Window

w1 = Window.partitionBy('EK').orderBy('date')
w2 = Window.partitionBy('EK')

df.withColumn('row_number', f.row_number().over(w1)) \
  .withColumn('last', f.last('row_number').over(w2)) \
  .filter('row_number = last') \
  .show(truncate=False)

+---+-----+--------+----------+----+
|EK |Value|date    |row_number|last|
+---+-----+--------+----------+----+
|5  |175  |1/5/2020|3         |3   |
|62 |21   |9/2/2000|2         |2   |
+---+-----+--------+----------+----+

If you don't care about the row_number,

from pyspark.sql import functions as f
from pyspark.sql import Window

w = Window.partitionBy('EK')

df.withColumn('last', f.last('date').over(w)) \
  .filter('date = last').show(truncate=False)

+---+-----+--------+--------+
|EK |Value|date    |last    |
+---+-----+--------+--------+
|5  |175  |1/5/2020|1/5/2020|
|62 |21   |9/2/2000|9/2/2000|
+---+-----+--------+--------+

and drop last.

Upvotes: 4

Kafels
Kafels

Reputation: 4059

You can create a temporary column with maximum row number by partition, then filter and drop it:

from pyspark.sql.functions import col, max, row_number


window = Window.partitionBy("EK").orderBy("date")

df = df.withColumn("row_number", row_number().over(window))
df = (df
      .withColumn('max_row_number', max('row_number').over(Window.partitionBy("EK")))
      .where(col('row_number') == col('max_row_number'))
      .drop('max_row_number'))

df.show(truncate=False)

Upvotes: 2

Related Questions