Jiew Meng
Jiew Meng

Reputation: 88207

Understanding Spark Explain: Collect vs Global vs Local Limit

I am trying to see the difference between doing limits in Spark/AWS Glue

I tried using Spark SQL

spark.sql("SELECT * FROM flights LIMIT 10")

The explain looks something like:

CollectLimit 10
+- *FileScan parquet xxxxxx.flights[Id#31,...] Batched: true, Format: Parquet, Location: CatalogFileIndex[s3://xxxxxx/flights], PartitionCount: 14509, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<...

Then I tried using AWS Glue Data Catalog to see if its any faster

gdf = glueContext.create_dynamic_frame.from_catalog(database = "xxxxxx", table_name = "xxxxxx")
df = gdf.toDF()
df = df.limit(10)

df.explain(True)

df.show(10)

The explain looks like:

GlobalLimit 10
+- LocalLimit 10
+- LogicalRDD [Id#70, ...]

The first runs in 5 mins the 2nd runs in 4 mins, not that significant yet but I am thinking it appears that either querying the data catalog is faster or doing a limit in data frame is better than doing the limit in spark SQL?

Whats the difference between a collect vs global vs local limit? I am guessing local limit means it does limit locally then the driver will do the global limit to give the final result. But why is Spark SQL not also doing this optimization?

Does Spark reads all the underlying parquet files before doing any limit? Is there a way to tell spark to read until it gets just 10 rows in this example case?

Upvotes: 1

Views: 3438

Answers (1)

DaRkMaN
DaRkMaN

Reputation: 1054

  1. SQL way , Programmatic Dataset creation -the control flow is the same in both cases, it goes through Spark SQL catalyst. In your case, when the query was run for the first time, it fetches metadata about the table from the metastore and caches it, in subsequent queries, it is reused, this might be the reason for the slowness in the first query.
  2. There is no LogicalPlan node as CollectLimit, there is only CollectLimitExec physicalplan node. And limit is implemented as LocalLimit followed by GlobalLimit(link to code)
  3. Spark performs limit incrementally.
    It tries to retrieve the given number of rows using one partition. If the number of rows is not satisfied, Spark then queries the next 4 partitions(determined by spark.sql.limit.scaleUpFactor, default 4), then 16 and so on until the limit is satisfied or the data is exhausted.

Upvotes: 5

Related Questions