Reputation: 1334
We all know that, in SQL, generally, we have a defined order of lexical operations when writing the code:
SELECT ...
FROM ...
JOIN ...
WHERE ...
GROUP BY ...
HAVING ...
ORDER BY ...
How is that manifested in Spark? I do know it's all about attributes of particular objects, so if I can ask the question in a different way - what's a useful way to think about lexical order of operations when writing Spark applications, for people coming from SQL?
To illustrate my confusion. Here are two pieces of code from my tests, where I put orderBy
in two quite different spots (again, coming from SQL background), yet the code yeld exactly the same results:
tripDatawithDT \
.filter(tripData["Subscriber Type"] == "Subscriber")\
.orderBy(desc("End Date DT"))\
.groupBy("End Date DT")\
.count()\
.show()
tripDatawithDT \
.filter(tripData["Subscriber Type"] == "Subscriber")\
.groupBy("End Date DT")\
.count()\
.orderBy(desc("End Date DT"))\
.show()
Still, there are other cases where I completely mess up my code due to wrong lexical order of operations.
Upvotes: 4
Views: 2159
Reputation: 4631
TL;DR As long as you use standard open source build without custom optimizer Rules
, you can assume that each DSL operation induces a logical subquery, and all logical optimizations are consistent with SQL:2003 standard. In other words, your SQL should applicable here.
Internally Spark represents SQL queries a tree of LogicalPlans
, where each operator corresponds to a single node, with its inputs as the children.
As a result unoptimized logical plan corresponding to DSL expression consist of a nested node for each operator (projection, selection, ordering, aggregation with or without grouping). So given the table
from pyspark.sql.functions import col, desc
t0 = spark.createDataFrame(
[], "`End Date DT` timestamp, `Subscriber Type` string"
)
t0.createOrReplaceTempView("t0")
the first query
(t0.alias("t0")
.filter(col("Subscriber Type") == "Subscriber").alias("t1")
.orderBy(desc("End Date DT")).alias("t2")
.groupBy("End Date DT")
.count())
is roughly equivalent* to
SELECT `End Date DT`, COUNT(*) AS count FROM (
SELECT * FROM (
SELECT * FROM t0 WHERE `Subscriber Type` = 'Subscriber'
) as t1 ORDER BY `End Date DT` DESC
) as t2 GROUP BY `End Date DT`
while
(t0.alias("t0")
.filter(col("Subscriber Type") == "Subscriber").alias("t1")
.groupBy("End Date DT")
.count().alias("t2")
.orderBy(desc("End Date DT")))
is roughly equivalent** to
SELECT * FROM (
SELECT `End Date DT`, COUNT(*) AS count FROM (
SELECT * FROM t0 WHERE `Subscriber Type` = 'Subscriber'
) as t1 GROUP BY `End Date DT`
) as t2 ORDER BY `End Date DT` DESC
Clearly both queries are not equivalent and this is reflected in their optimized execution plans.
ORDER BY
before GROUP BY
corresponds to
== Optimized Logical Plan ==
Aggregate [End Date DT#38], [End Date DT#38, count(1) AS count#70L]
+- Sort [End Date DT#38 DESC NULLS LAST], true
+- Project [End Date DT#38]
+- Filter (isnotnull(Subscriber Type#39) && (Subscriber Type#39 = Subscriber))
+- LogicalRDD [End Date DT#38, Subscriber Type#39], false
while ORDER BY
after GROUP BY
corresponds to
== Optimized Logical Plan ==
Sort [End Date DT#38 DESC NULLS LAST], true
+- Aggregate [End Date DT#38], [End Date DT#38, count(1) AS count#84L]
+- Project [End Date DT#38]
+- Filter (isnotnull(Subscriber Type#39) && (Subscriber Type#39 = Subscriber))
+- LogicalRDD [End Date DT#38, Subscriber Type#39], false
So why these can give the same final result? That's because in the basic cases like here, query planner will treat preceding ORDER BY
as a hint to apply range partitioning, instead of hash partitioning. Therefore the physical plan for the ORDER BY
followed by GROUP BY
will be
== Physical Plan ==
*(2) HashAggregate(keys=[End Date DT#38], functions=[count(1)])
+- *(2) HashAggregate(keys=[End Date DT#38], functions=[partial_count(1)])
+- *(2) Sort [End Date DT#38 DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(End Date DT#38 DESC NULLS LAST, 200)
+- *(1) Project [End Date DT#38]
+- *(1) Filter (isnotnull(Subscriber Type#39) && (Subscriber Type#39 = Subscriber))
+- Scan ExistingRDD[End Date DT#38,Subscriber Type#39]
whilst without ORDER BY
*** it will default to hash partitioning
== Physical Plan ==
*(2) HashAggregate(keys=[End Date DT#38], functions=[count(1)])
+- Exchange hashpartitioning(End Date DT#38, 200)
+- *(1) HashAggregate(keys=[End Date DT#38], functions=[partial_count(1)])
+- *(1) Project [End Date DT#38]
+- *(1) Filter (isnotnull(Subscriber Type#39) && (Subscriber Type#39 = Subscriber))
+- Scan ExistingRDD[End Date DT#38,Subscriber Type#39]
Because this happens at the planning stage, which is high-impact extension point (especially for data source providers), I'd consider this a detail of implementation, and don't depend on this behavior for correctness.
* With parsed logical plan for DSL variant
== Parsed Logical Plan ==
'Aggregate ['End Date DT], [unresolvedalias('End Date DT, None), count(1) AS count#45L]
+- SubqueryAlias `t2`
+- Sort [End Date DT#38 DESC NULLS LAST], true
+- SubqueryAlias `t1`
+- Filter (Subscriber Type#39 = Subscriber)
+- SubqueryAlias `t0`
+- LogicalRDD [End Date DT#38, Subscriber Type#39], false
and for SQL variant
== Parsed Logical Plan ==
'Aggregate ['End Date DT], ['End Date DT, 'COUNT(1) AS count#50]
+- 'SubqueryAlias `t2`
+- 'Sort ['End Date DT DESC NULLS LAST], true
+- 'Project [*]
+- 'SubqueryAlias `t1`
+- 'Project [*]
+- 'Filter ('Subscriber Type = Subscriber)
+- 'UnresolvedRelation `t0`
** With parsed logical plan for DSL variant
== Parsed Logical Plan ==
'Sort ['End Date DT DESC NULLS LAST], true
+- SubqueryAlias `t2`
+- Aggregate [End Date DT#38], [End Date DT#38, count(1) AS count#59L]
+- SubqueryAlias `t1`
+- Filter (Subscriber Type#39 = Subscriber)
+- SubqueryAlias `t0`
+- LogicalRDD [End Date DT#38, Subscriber Type#39], false
and for SQL variant
== Parsed Logical Plan ==
'Sort ['End Date DT DESC NULLS LAST], true
+- 'Project [*]
+- 'SubqueryAlias `t2`
+- 'Aggregate ['End Date DT], ['End Date DT, 'COUNT(1) AS count#64]
+- 'SubqueryAlias `t1`
+- 'Project [*]
+- 'Filter ('Subscriber Type = Subscriber)
+- 'UnresolvedRelation `t0`
*** i.e.
(t0.alias("t0")
.filter(col("Subscriber Type") == "Subscriber").alias("t1")
.groupBy("End Date DT")
.count()).explain()
Upvotes: 5