Georgi Raychev
Georgi Raychev

Reputation: 1334

Spark lexical order of operations

We all know that, in SQL, generally, we have a defined order of lexical operations when writing the code:

SELECT ...
FROM ...
JOIN ...
WHERE ...
GROUP BY ...
HAVING ...
ORDER BY ...

How is that manifested in Spark? I do know it's all about attributes of particular objects, so if I can ask the question in a different way - what's a useful way to think about lexical order of operations when writing Spark applications, for people coming from SQL?

To illustrate my confusion. Here are two pieces of code from my tests, where I put orderBy in two quite different spots (again, coming from SQL background), yet the code yeld exactly the same results:

tripDatawithDT \
.filter(tripData["Subscriber Type"] == "Subscriber")\
.orderBy(desc("End Date DT"))\
.groupBy("End Date DT")\
.count()\
.show()


tripDatawithDT \
.filter(tripData["Subscriber Type"] == "Subscriber")\
.groupBy("End Date DT")\
.count()\
.orderBy(desc("End Date DT"))\
.show()

Still, there are other cases where I completely mess up my code due to wrong lexical order of operations.

Upvotes: 4

Views: 2159

Answers (1)

10465355
10465355

Reputation: 4631

TL;DR As long as you use standard open source build without custom optimizer Rules, you can assume that each DSL operation induces a logical subquery, and all logical optimizations are consistent with SQL:2003 standard. In other words, your SQL should applicable here.

Internally Spark represents SQL queries a tree of LogicalPlans, where each operator corresponds to a single node, with its inputs as the children.

As a result unoptimized logical plan corresponding to DSL expression consist of a nested node for each operator (projection, selection, ordering, aggregation with or without grouping). So given the table

from pyspark.sql.functions import col, desc

t0 = spark.createDataFrame(
    [], "`End Date DT` timestamp, `Subscriber Type` string"
)
t0.createOrReplaceTempView("t0")

the first query

(t0.alias("t0")
    .filter(col("Subscriber Type") == "Subscriber").alias("t1")
    .orderBy(desc("End Date DT")).alias("t2")
    .groupBy("End Date DT")
    .count())

is roughly equivalent* to

SELECT `End Date DT`, COUNT(*)  AS count FROM (
    SELECT * FROM (
        SELECT * FROM t0 WHERE `Subscriber Type` = 'Subscriber'
    ) as t1 ORDER BY `End Date DT` DESC
) as t2 GROUP BY `End Date DT`

while

(t0.alias("t0")
    .filter(col("Subscriber Type") == "Subscriber").alias("t1")
    .groupBy("End Date DT")
    .count().alias("t2")
    .orderBy(desc("End Date DT")))

is roughly equivalent** to

SELECT * FROM (
    SELECT `End Date DT`, COUNT(*) AS count FROM (
        SELECT * FROM t0 WHERE `Subscriber Type` = 'Subscriber'
    ) as t1 GROUP BY `End Date DT`
) as t2 ORDER BY `End Date DT` DESC

Clearly both queries are not equivalent and this is reflected in their optimized execution plans.

ORDER BY before GROUP BY corresponds to

== Optimized Logical Plan ==
Aggregate [End Date DT#38], [End Date DT#38, count(1) AS count#70L]
+- Sort [End Date DT#38 DESC NULLS LAST], true
   +- Project [End Date DT#38]
      +- Filter (isnotnull(Subscriber Type#39) && (Subscriber Type#39 = Subscriber))
         +- LogicalRDD [End Date DT#38, Subscriber Type#39], false

while ORDER BY after GROUP BY corresponds to

== Optimized Logical Plan ==
Sort [End Date DT#38 DESC NULLS LAST], true
+- Aggregate [End Date DT#38], [End Date DT#38, count(1) AS count#84L]
   +- Project [End Date DT#38]
      +- Filter (isnotnull(Subscriber Type#39) && (Subscriber Type#39 = Subscriber))
         +- LogicalRDD [End Date DT#38, Subscriber Type#39], false

So why these can give the same final result? That's because in the basic cases like here, query planner will treat preceding ORDER BY as a hint to apply range partitioning, instead of hash partitioning. Therefore the physical plan for the ORDER BY followed by GROUP BY will be

== Physical Plan ==
*(2) HashAggregate(keys=[End Date DT#38], functions=[count(1)])
+- *(2) HashAggregate(keys=[End Date DT#38], functions=[partial_count(1)])
   +- *(2) Sort [End Date DT#38 DESC NULLS LAST], true, 0
      +- Exchange rangepartitioning(End Date DT#38 DESC NULLS LAST, 200)
         +- *(1) Project [End Date DT#38]
            +- *(1) Filter (isnotnull(Subscriber Type#39) && (Subscriber Type#39 = Subscriber))
               +- Scan ExistingRDD[End Date DT#38,Subscriber Type#39]

whilst without ORDER BY*** it will default to hash partitioning

== Physical Plan ==
*(2) HashAggregate(keys=[End Date DT#38], functions=[count(1)])
+- Exchange hashpartitioning(End Date DT#38, 200)
   +- *(1) HashAggregate(keys=[End Date DT#38], functions=[partial_count(1)])
      +- *(1) Project [End Date DT#38]
         +- *(1) Filter (isnotnull(Subscriber Type#39) && (Subscriber Type#39 = Subscriber))
            +- Scan ExistingRDD[End Date DT#38,Subscriber Type#39]

Because this happens at the planning stage, which is high-impact extension point (especially for data source providers), I'd consider this a detail of implementation, and don't depend on this behavior for correctness.


* With parsed logical plan for DSL variant

== Parsed Logical Plan ==
'Aggregate ['End Date DT], [unresolvedalias('End Date DT, None), count(1) AS count#45L]
+- SubqueryAlias `t2`
   +- Sort [End Date DT#38 DESC NULLS LAST], true
      +- SubqueryAlias `t1`
         +- Filter (Subscriber Type#39 = Subscriber)
            +- SubqueryAlias `t0`
               +- LogicalRDD [End Date DT#38, Subscriber Type#39], false

and for SQL variant

== Parsed Logical Plan ==
'Aggregate ['End Date DT], ['End Date DT, 'COUNT(1) AS count#50]
+- 'SubqueryAlias `t2`
   +- 'Sort ['End Date DT DESC NULLS LAST], true
      +- 'Project [*]
         +- 'SubqueryAlias `t1`
            +- 'Project [*]
               +- 'Filter ('Subscriber Type = Subscriber)
                  +- 'UnresolvedRelation `t0`

** With parsed logical plan for DSL variant

== Parsed Logical Plan ==
'Sort ['End Date DT DESC NULLS LAST], true
+- SubqueryAlias `t2`
   +- Aggregate [End Date DT#38], [End Date DT#38, count(1) AS count#59L]
      +- SubqueryAlias `t1`
         +- Filter (Subscriber Type#39 = Subscriber)
            +- SubqueryAlias `t0`
               +- LogicalRDD [End Date DT#38, Subscriber Type#39], false

and for SQL variant

== Parsed Logical Plan ==
'Sort ['End Date DT DESC NULLS LAST], true
+- 'Project [*]
   +- 'SubqueryAlias `t2`
      +- 'Aggregate ['End Date DT], ['End Date DT, 'COUNT(1) AS count#64]
         +- 'SubqueryAlias `t1`
            +- 'Project [*]
               +- 'Filter ('Subscriber Type = Subscriber)
                  +- 'UnresolvedRelation `t0`

*** i.e.

(t0.alias("t0") 
    .filter(col("Subscriber Type") == "Subscriber").alias("t1") 
    .groupBy("End Date DT") 
    .count()).explain()   

Upvotes: 5

Related Questions