Reputation:
This is a question about Window Functions in Spark.
Assume I have this DF
DATE_S | ID | STR | VALUE
-------------------------
1 | 1 | A | 0.5
1 | 1 | A | 1.23
1 | 1 | A | -0.4
2 | 1 | A | 2.0
3 | 1 | A | -1.2
3 | 1 | A | 0.523
1 | 2 | A | 1.0
2 | 2 | A | 2.5
3 | 2 | A | 1.32
3 | 2 | A | -3.34
1 | 1 | B | 1.5
1 | 1 | B | 0.23
1 | 1 | B | -0.3
2 | 1 | B | -2.0
3 | 1 | B | 1.32
3 | 1 | B | 523.0
1 | 2 | B | 1.3
2 | 2 | B | -0.5
3 | 2 | B | 4.3243
3 | 2 | B | 3.332
This is just an example! Assume that there are many more DATE_S for each (ID, STR), many more IDs and STRs, and many more entries per (DATE_S, ID, STR). Obviously there are multiple values per Combination (DATE_S, ID, STR)
Now I do this:
val w = Window.partitionBy("ID", "STR").orderBy("DATE_S").rangeBetween(-N, -1)
df.withColumn("RESULT", function("VALUE").over(w))
where N might lead to the inclusion of a large range of rows, from 100 to 100000 and more, depending on ("ID", "STR")
The result will be something like this
DATE_S | ID | STR | VALUE | RESULT
----------------------------------
1 | 1 | A | 0.5 | R1
1 | 1 | A | 1.23 | R1
1 | 1 | A | -0.4 | R1
2 | 1 | A | 2.0 | R2
3 | 1 | A | -1.2 | R3
3 | 1 | A | 0.523 | R3
1 | 2 | A | 1.0 | R4
2 | 2 | A | 2.5 | R5
3 | 2 | A | 1.32 | R6
3 | 2 | A | -3.34 | R7
1 | 1 | B | 1.5 | R8
1 | 1 | B | 0.23 | R8
1 | 1 | B | -0.3 | R9
2 | 1 | B | -2.0 | R10
3 | 1 | B | 1.32 | R11
3 | 1 | B | 523.0 | R11
1 | 2 | B | 1.3 | R12
2 | 2 | B | -0.5 | R13
3 | 2 | B | 4.3243| R14
3 | 2 | B | 3.332 | R14
There are identical "RESULT"s because for every row with identical (DATE_S, ID, ST), the values that go into the calculation of "function" are the same.
My question is this:
Does spark call "function" for each ROW (recalculating the same value multiple times) or calculate it once per range (frame?) of values and just pastes them on all rows that fall in the range?
Thanks for reading :)
Upvotes: 1
Views: 150
Reputation: 18053
From your data the result may not be the same if run twice from what I can see as there is no distinct ordering possibility. But we leave that aside.
Whilst there is codegen optimization, it is nowhere to be found that it checks in the way you state for if the next invocation is the same set of data to process for the next row. I have never read of that type of optimization. There is fusing due to lazy evaluation approach, but that is another matter. So, per row it calculates again.
From a great source: https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-functions-windows.html
... At its core, a window function calculates a return value for every input row of a table based on a group of rows, called the frame. Every input row can have a unique frame associated with it. ...
... In other words, when executed, a window function computes a value for each and every row in a window (per window specification). ...
The biggest issue is to have suitable number of partitions for parallel processing, which is expensive, but this is big data. partitionBy("ID", "STR")
is the clue here and that is a good thing.
Upvotes: 1