Reputation: 77
I have a dataframe that looks as below, and I'm using the below mentioned code the get it
+---+----------+--------+----------+
|EK |Value |date |row_number|
+---+----------+--------+----------+
|5 |100 |1/1/2020|1 |
|5 |150 |1/3/2020|2 |
|5 |175 |1/5/2020|3 |
|62 |200 |2/9/1999|1 |
|62 |21 |9/2/2000|2 |
+---+----------+--------+----------+
window = Window.partitionBy("EK").orderBy("date")
df.withColumn("row_number",row_number().over(window))
The expected result is to get the maximum row number in every window as shown below,
+---+----------+--------+----------+
|EK |Value |date |row_number|
+---+----------+--------+----------+
|5 |175 |1/5/2020|3 |
|62 |21 |9/2/2000|2 |
+---+----------+--------+----------+
Thank you.
Upvotes: 0
Views: 3418
Reputation: 13541
You can use one more window
to get the last value.
from pyspark.sql import functions as f
from pyspark.sql import Window
w1 = Window.partitionBy('EK').orderBy('date')
w2 = Window.partitionBy('EK')
df.withColumn('row_number', f.row_number().over(w1)) \
.withColumn('last', f.last('row_number').over(w2)) \
.filter('row_number = last') \
.show(truncate=False)
+---+-----+--------+----------+----+
|EK |Value|date |row_number|last|
+---+-----+--------+----------+----+
|5 |175 |1/5/2020|3 |3 |
|62 |21 |9/2/2000|2 |2 |
+---+-----+--------+----------+----+
If you don't care about the row_number
,
from pyspark.sql import functions as f
from pyspark.sql import Window
w = Window.partitionBy('EK')
df.withColumn('last', f.last('date').over(w)) \
.filter('date = last').show(truncate=False)
+---+-----+--------+--------+
|EK |Value|date |last |
+---+-----+--------+--------+
|5 |175 |1/5/2020|1/5/2020|
|62 |21 |9/2/2000|9/2/2000|
+---+-----+--------+--------+
and drop last
.
Upvotes: 4
Reputation: 4059
You can create a temporary column with maximum row number by partition, then filter and drop it:
from pyspark.sql.functions import col, max, row_number
window = Window.partitionBy("EK").orderBy("date")
df = df.withColumn("row_number", row_number().over(window))
df = (df
.withColumn('max_row_number', max('row_number').over(Window.partitionBy("EK")))
.where(col('row_number') == col('max_row_number'))
.drop('max_row_number'))
df.show(truncate=False)
Upvotes: 2