Vladimir Shadrin
Vladimir Shadrin

Reputation: 376

Do Spark Window functions work independently per partition?

I'm trying to get the latest row for each day for each some_guid. For example, I have the following data, everything is sorted by item_time by descending:

+----------+--------------------+-------------+
| file_date|           some_guid|    item_time|
+----------+--------------------+-------------+
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1632637545493|
|2021-11-22|22549ca165d88ffd2...|1632723945493|
|2021-11-22|22549ca165d88ffd2...|1632810345493|
|2021-11-22|22549ca165d88ffd2...|1632896745493|
|2021-11-22|22549ca165d88ffd2...|1632983145493|
|2021-11-22|22549ca165d88ffd2...|1633069545493|
|2021-11-22|22549ca165d88ffd2...|1633155945493|
|2021-11-22|22549ca165d88ffd2...|1633242345493|
|2021-11-22|22549ca165d88ffd2...|1633328745493|
|2021-11-22|22549ca165d88ffd2...|1633415145493|
|2021-11-22|22549ca165d88ffd2...|1633501545493|
|2021-11-22|22549ca165d88ffd2...|1633587945493|
|2021-11-22|22549ca165d88ffd2...|1633674345493|
|2021-11-22|22549ca165d88ffd2...|1633760745493|
|2021-11-22|22549ca165d88ffd2...|1633847145493|

As you see all fields in item_time are different. Then I apply the following transformation:

daily_window = Window.partitionBy('file_date', 'some_guid').orderBy(col('item_time').desc())
df.select('file_date','some_guid', first('item_time').over(daily_window).alias('item_time'))

And get the following result:

+----------+--------------------+-------------+
| file_date|           some_guid|    item_time|
+----------+--------------------+-------------+
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|

There are many duplicates, but I'm expecting only one row. Why does this happen? Is window function performed in each partition and give same row, which is printed later as many times as the partitions I have?

UPD:

How can I do the same if I have the fourth column? For example, select last row in the following dataset:

+----------+--------------------+-------------+------+
| file_date|           some_guid|    item_time|  col4|
+----------+--------------------+-------------+------+
|2021-11-22|22549ca165d88ffd2...|1632562345493| data1|
|2021-11-22|22549ca165d88ffd2...|1632637545493| data2|
|2021-11-22|22549ca165d88ffd2...|1632723945493| data3|
|2021-11-22|22549ca165d88ffd2...|1632810345493| data4|

And I need select last row with 'data4' in the col4

Upvotes: 1

Views: 1144

Answers (3)

tianzhipeng
tianzhipeng

Reputation: 2209

use row_number followed by a filter.

daily_window = Window.partitionBy('file_date', 'some_guid').orderBy(col('item_time').desc())
df.select(
    'file_date','some_guid','col4',
    row_number().over(daily_window).alias('rn')
  ).filter("rn = 1")

Upvotes: 3

clairtonm
clairtonm

Reputation: 51

You are aggregating by fill_date and some_guid, and looking at your data you only have one group:

fill_date some_guid
2021-11-22 22549ca165d88ffd2...

(we don't see the rest of some_guid and appears to be the same to all rows)

Then, its apply the first value for all the rows. Until here its seems correct.

But, I would recommend you try withColumn() instead of select:

df.withColumn('item_time'), first('item_time').over(daily_window))

Edit:

If you expect only one row you want to use groupby. Follow the previous answer: https://stackoverflow.com/a/70081054/13960095

Windows is when you want to all the rows of the group have a calculated value based on the group values.

Upvotes: 2

Raphael Roth
Raphael Roth

Reputation: 27373

use groupBy:

df.groupBy('file_date','some_guid').agg(max('item_time'))

Or use window-functions (i.e. with rank/row_number) to enumerate the records, then use where/filter to select the desired records

Upvotes: 1

Related Questions