M_S
M_S

Reputation: 3763

Spark window aggregate function not working intuitively with records ordering

I have following example which i am running on Spark 3.3

import pyspark.sql.functions as F
from pyspark.sql import Window

inputData = [
  ("1", 333),
  ("1", 222),
  ("1", 111),
  ("2", 334)
]
inputDf = spark.createDataFrame(inputData, schema=["id", "val"])

window = Window.partitionBy("id")
aggregatedDf = (
    inputDf.withColumn("min_val", F.min(F.col("val")).over(window))
    .withColumn("max_val", F.max(F.col("val")).over(window))
).show()

The output is as expected, i am getting correct min/max value for each window

+---+---+-------+-------+
| id|val|min_val|max_val|
+---+---+-------+-------+
|  1|333|    111|    333|
|  1|222|    111|    333|
|  1|111|    111|    333|
|  2|334|    334|    334|
+---+---+-------+-------+

When i add orderBy to window, output is different:

window = Window.partitionBy("id").orderBy(F.col("val").desc())

+---+---+-------+-------+
| id|val|min_val|max_val|
+---+---+-------+-------+
|  1|333|    333|    333|
|  1|222|    222|    333|
|  1|111|    111|    333|
|  2|334|    334|    334|
+---+---+-------+-------+

As you can see, with desc ordering max_value is fine, but min_value is changing from record to record

I tried to find more informations in docu or here on SO but no luck. For me its not intuitive at all.

My expectation was that Spark is going to scan all records in given partition and assign min/max value for each record within partition, which is true without ordering within window, but works differently when ordering is added

Does anyone know why its working like this?

Upvotes: 3

Views: 647

Answers (1)

Ronak Jain
Ronak Jain

Reputation: 3358

You need to add Frame to get the output you expect.

As per Docs:

Note When ordering is not defined, an unbounded window frame (rowFrame, unboundedPreceding, unboundedFollowing) is used by default. When ordering is defined, a growing window frame (rangeFrame, unboundedPreceding, currentRow) is used by default.

Essentially Spark or any SQL will by default consider Window till the current row while processing the function for that row. By adding the Frame as - unboundedPreceding to unboundedFollowing - we ask Spark to consider the whole window instead.

For e.g. while processing min function for the second row in your dataframe (order by value in descending manner), Spark will consider window for id=1 as the first and second row (Between unboundedPreceding and CURRENT_ROW)

This would work

window = Window.partitionBy("id")\
.orderBy(F.col("val").desc())\
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

Output:

Out

To understand more about frame, consider reading https://docs.oracle.com/cd/E17952_01/mysql-8.0-en/window-functions-frames.html

https://medium.com/expedia-group-tech/deep-dive-into-apache-spark-window-functions-7b4e39ad3c86

Upvotes: 3

Related Questions