Joe C
Joe C

Reputation: 2827

Spark SQL repeats computing the same subquery when union

I have a union of two exactly the same subquery. However based on the query explain, Spark SQL seems running the same subquery twice. Is the expected?

    In [20]: session.sql('(select count(city_code) as c from location group by country_code having c < 10) union (select count(city_code) as c from location group by country_code having c < 10)').explain(True)


== Parsed Logical Plan ==
'Distinct
+- 'Union
   :- 'Filter ('c < 10)
   :  +- 'Aggregate ['country_code], ['count('city_code) AS c#228]
   :     +- 'UnresolvedRelation `location`
   +- 'Filter ('c < 10)
      +- 'Aggregate ['country_code], ['count('city_code) AS c#229]
         +- 'UnresolvedRelation `location`

== Analyzed Logical Plan ==
c: bigint
Distinct
+- Union
   :- Filter (c#228L < cast(10 as bigint))
   :  +- Aggregate [country_code#231], [count(city_code#234) AS c#228L]
   :     +- SubqueryAlias location
   :        +- Relation[uid#230L,country_code#231,country_category#232,region_code#233,city_code#234,time#235L] parquet
   +- Filter (c#229L < cast(10 as bigint))
      +- Aggregate [country_code#237], [count(city_code#240) AS c#229L]
         +- SubqueryAlias location
            +- Relation[country_code#237,city_code#240] parquet

== Optimized Logical Plan ==
Aggregate [c#228L], [c#228L]
+- Union
   :- Filter (c#228L < 10)
   :  +- Aggregate [country_code#231], [count(city_code#234) AS c#228L]
   :     +- Project [country_code#231, city_code#234]
   :        +- Relation[country_code#231,city_code#234] parquet
   +- Filter (c#229L < 10)
      +- Aggregate [country_code#237], [count(city_code#240) AS c#229L]
         +- Project [country_code#237, city_code#240]
            +- Relation[country_code#237,city_code#240] parquet

== Physical Plan ==
*HashAggregate(keys=[c#228L], functions=[], output=[c#228L])
+- Exchange hashpartitioning(c#228L, 200)
   +- *HashAggregate(keys=[c#228L], functions=[], output=[c#228L])
      +- Union
         :- *Filter (c#228L < 10)
         :  +- *HashAggregate(keys=[country_code#231], functions=[count(city_code#234)], output=[c#228L])
         :     +- Exchange hashpartitioning(country_code#231, 200)
         :        +- *HashAggregate(keys=[country_code#231], functions=[partial_count(city_code#234)], output=[country_code#231, count#255L])
         :           +- *FileScan parquet default.location[country_code#231,city_code#234] Batched: true, Format: Parquet, Location: InMemoryFileIndex[.../location], PartitionFilters: [], PushedFilters: [],
 ReadSchema: struct<country_code:string,city_code:string>
         +- *Filter (c#229L < 10)
            +- *HashAggregate(keys=[country_code#237], functions=[count(city_code#240)], output=[c#229L])
               +- ReusedExchange [country_code#237, count#257L], Exchange hashpartitioning(country_code#231, 200)

Upvotes: 1

Views: 561

Answers (2)

RefiPeretz
RefiPeretz

Reputation: 563

Short answer: yes.

Consider this scala spark code example

val data = sqlContext.read.table("location")
val a = data.groupBy("country_code").agg(count(city_code) as "c").filter($"c" < 10)
val b = data.groupBy("country_code").agg(count(city_code) as "c").filter($"c" < 10)
a.union(b).show()

Are you still be surprised?.

That way it will be easier to see that spark will have two execution trees one for val a and one for val b.

Spark optimizations do not care about overlaps between two queries even if there are practically the same one. It will compute subtree for each query and optimize this subtree.

Upvotes: 4

Ged
Ged

Reputation: 18098

session.sql('(select count(city_code) as c from location 
              group by country_code having c < 10) 
             union 
             (select count(city_code) as c from location 
              group by country_code having c < 10)')

In this approach - as the pp shows - it will not consider caching. You need some external aspect to refer to, and voila I am pre-empted by an earlier answer.

Your question though: in this manner, yes it is expected. Refer to other answer as well for what I also state and that is instantiated.

NB: A WITH Clause cannot be cached either.

Upvotes: 1

Related Questions