Reputation: 3763
I have following example which i am running on Spark 3.3
import pyspark.sql.functions as F
from pyspark.sql import Window
inputData = [
("1", 333),
("1", 222),
("1", 111),
("2", 334)
]
inputDf = spark.createDataFrame(inputData, schema=["id", "val"])
window = Window.partitionBy("id")
aggregatedDf = (
inputDf.withColumn("min_val", F.min(F.col("val")).over(window))
.withColumn("max_val", F.max(F.col("val")).over(window))
).show()
The output is as expected, i am getting correct min/max value for each window
+---+---+-------+-------+
| id|val|min_val|max_val|
+---+---+-------+-------+
| 1|333| 111| 333|
| 1|222| 111| 333|
| 1|111| 111| 333|
| 2|334| 334| 334|
+---+---+-------+-------+
When i add orderBy to window, output is different:
window = Window.partitionBy("id").orderBy(F.col("val").desc())
+---+---+-------+-------+
| id|val|min_val|max_val|
+---+---+-------+-------+
| 1|333| 333| 333|
| 1|222| 222| 333|
| 1|111| 111| 333|
| 2|334| 334| 334|
+---+---+-------+-------+
As you can see, with desc ordering max_value is fine, but min_value is changing from record to record
I tried to find more informations in docu or here on SO but no luck. For me its not intuitive at all.
My expectation was that Spark is going to scan all records in given partition and assign min/max value for each record within partition, which is true without ordering within window, but works differently when ordering is added
Does anyone know why its working like this?
Upvotes: 3
Views: 647
Reputation: 3358
You need to add Frame to get the output you expect.
As per Docs:
Note When ordering is not defined, an unbounded window frame (rowFrame, unboundedPreceding, unboundedFollowing) is used by default. When ordering is defined, a growing window frame (rangeFrame, unboundedPreceding, currentRow) is used by default.
Essentially Spark or any SQL will by default consider Window till the current row while processing the function for that row. By adding the Frame as - unboundedPreceding to unboundedFollowing - we ask Spark to consider the whole window instead.
For e.g. while processing min
function for the second row in your dataframe (order by value in descending manner), Spark will consider window for id=1
as the first and second row (Between unboundedPreceding and CURRENT_ROW)
This would work
window = Window.partitionBy("id")\
.orderBy(F.col("val").desc())\
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
Output:
To understand more about frame, consider reading https://docs.oracle.com/cd/E17952_01/mysql-8.0-en/window-functions-frames.html
https://medium.com/expedia-group-tech/deep-dive-into-apache-spark-window-functions-7b4e39ad3c86
Upvotes: 3