Tom
Tom

Reputation: 6332

Behavior of Spark SQL window order by

I have a simple table that has 3 columns ,depName, empNo, salary, when I run the following window query

val ws = Window.partitionBy("depName").orderBy("empNo")

ds.withColumn("avg", avg("salary") over ws).show()

it outputs the following result,it shows that the salary avg within the depName is from the first row to the current. How could this happen? I thought that all the avg with one deptName should be same.

If I don't user orderBy("empNo") to create the ws, then all the avg with one deptName are the same.

I would ask how this happens, thanks.

 +---------+-----+------+-----------------+
|  depName|empNo|salary|              avg|
+---------+-----+------+-----------------+
|  develop|    7|  4200|           4200.0|
|  develop|    8|  6000|           5100.0|
|  develop|    9|  4500|           4900.0|
|  develop|   10|  5200|           4975.0|
|  develop|   11|  5200|           5020.0|
|    sales|    1|  5000|           5000.0|
|    sales|    3|  4800|           4900.0|
|    sales|    4|  4800|4866.666666666667|
|personnel|    2|  3900|           3900.0|
|personnel|    5|  3500|           3700.0|
+---------+-----+------+-----------------+

Upvotes: 1

Views: 1686

Answers (3)

Sanchez
Sanchez

Reputation: 51

As @Gladiator explained, you can achieve this by specifying different bounds (in order to make running avg = total avg).

It will give you the same result for all rows within one window partition and the rows will be sorted by "empNo".

val ws = Window.partitionBy("depName")
      .orderBy("empNo")
      .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

ds.withColumn("avg", avg("salary") over ws).show(false)

The result:

+---------+-----+------+-----------------+
|depName  |empNo|salary|avg              |
+---------+-----+------+-----------------+
|develop  |7    |4200  |5020.0           |
|develop  |8    |6000  |5020.0           |
|develop  |9    |4500  |5020.0           |
|develop  |10   |5200  |5020.0           |
|develop  |11   |5200  |5020.0           |
|sales    |1    |5000  |4866.666666666667|
|sales    |3    |4800  |4866.666666666667|
|sales    |4    |4800  |4866.666666666667|
|personnel|2    |3900  |3700.0           |
|personnel|5    |3500  |3700.0           |
+---------+-----+------+-----------------+

Upvotes: 0

Sarath Subramanian
Sarath Subramanian

Reputation: 21271

Can you try with the below syntax. This will work as you expected as it will find the average by partitioning according to the depName and order by empNo

df.withColumn("avg_Time", avg($"salary").over(Window.partitionBy($"depName"))).orderBy("empNo").show()

enter image description here

Upvotes: 0

Gladiator
Gladiator

Reputation: 644

The AVG() window function operates on the rows defined in the window and returns a value for each row. The query with AVG() returns one row with the average of all the values in the specified column instead of returning values for each row.

The PARTITION BY clause subdivides the window into partitions. The ORDER BY clause defines the logical order of the rows within each partition of the result set. Window functions are applied to each rows, as and when it is returned after ordering within each partition. That is the reason why it is returning a running average than a total average.

As per github documentation,

  • @note When ordering is not defined, an unbounded window frame (rowFrame, unboundedPreceding, unboundedFollowing) is used by default. When ordering is defined, a growing window frame(rangeFrame, unboundedPreceding, currentRow) is used by default.

https://github.com/apache/spark/blob/1d95dea30788b9f64c5e304d908b85936aafb238/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala#L36

Upvotes: 4

Related Questions