Sam
Sam

Reputation: 475

How do group by and window functions interact in Spark SQL?

From this question, I learned that window functions are evaluated after the group by function in PostgresSQL.

I'd like to know what happens when you use a group by and window function in the same query in Spark. I have the same questions as the poster from the previous question:

Upvotes: 1

Views: 2376

Answers (1)

notNull
notNull

Reputation: 31540

If you have window and group by in same query then

  • Group by performed first then window function will be applied on the groupby dataset.

  • You can check query explain plan for more details.

Example:

//sample data
spark.sql("select * from tmp").show()
//+-------+----+
//|trip_id|name|
//+-------+----+
//|      1|   a|
//|      2|   b|
//+-------+----+


spark.sql("select row_number() over(order by trip_id),trip_id,count(*) cnt from tmp group by trip_id").explain()
//== Physical Plan ==
//*(4) Project [row_number() OVER (ORDER BY trip_id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#150, trip_id#10, cnt#140L]
//+- Window [row_number() windowspecdefinition(trip_id#10 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS row_number() OVER (ORDER BY //trip_id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#150], [trip_id#10 ASC NULLS FIRST]
//   +- *(3) Sort [trip_id#10 ASC NULLS FIRST], false, 0
//      +- Exchange SinglePartition
//         +- *(2) HashAggregate(keys=[trip_id#10], functions=[count(1)])
//            +- Exchange hashpartitioning(trip_id#10, 200)
//               +- *(1) HashAggregate(keys=[trip_id#10], functions=[partial_count(1)])
//                  +- LocalTableScan [trip_id#10]

*(2) groupby executed first

*(4) window function applied on the result of grouped dataset.

In case if you have window clause subquery and outer query have group by then subquery(window) executed first then outer query(groupBy) executed next.

Ex: spark.sql("select trip_id,count(*) from(select *,row_number() over(order by trip_id)rn from tmp)e group by trip_id ").explain()

Upvotes: 1

Related Questions