Peter Phan
Peter Phan

Reputation: 55

Why is execution time of spark sql query different between first time and second time of execution?

I am using spark sql to run some aggregated query on the parquet data source.

My parquet data source includes a table with columns: id int, time timestamp, location int, counter_1 long, counter_2 long, ..., counter_48. The total data size is about 887 MB.

My spark version is 2.4.0. I run one master and one slave on a single machine (4 cores, 16G memory).

Using spark-shell, I ran the spark command:

spark.time(spark.sql("SELECT location, sum(counter_1)+sum(counter_5)+sum(counter_10)+sum(counter_15)+sum(cou
nter_20)+sum(counter_25)+sum(counter_30)+sum(counter_35 )+sum(counter_40)+sum(counter_45) from parquet.`/home/hungp
han227/spark_data/counters` group by location").show())

The execution time is 17s.

The second time I ran a similar command (only change columns):

spark.time(spark.sql("SELECT location, sum(counter_2)+sum(counter_6)+sum(counter_11)+sum(counter_16)+sum(cou
nter_21)+sum(counter_26)+sum(counter_31)+sum(counter_36 )+sum(counter_41)+sum(counter_46) from parquet.`/home/hungp
han227/spark_data/counters` group by location").show())

The execution time is about 3s.

My first question is: Why are they different? I know it is not data caching because of the parquet format. Is it about reusing something like query planning?

I did another test: The first command is

spark.time(spark.sql("SELECT location, sum(counter_1)+sum(counter_5)+sum(counter_10)+sum(counter_15)+sum(cou
nter_20)+sum(counter_25)+sum(counter_30)+sum(counter_35 )+sum(counter_40)+sum(counter_45) from parquet.`/home/hungp
han227/spark_data/counters` group by location").show())

The execution time is 17s.

In the second command, I change the aggregate function:

spark.time(spark.sql("SELECT location, avg(counter_1)+avg(counter_5)+avg(counter_10)+avg(counter_15)+avg(cou
nter_20)+avg(counter_25)+avg(counter_30)+avg(counter_35 )+avg(counter_40)+avg(counter_45) from parquet.`/home/hungp
han227/spark_data/counters` group by location").show())

The execution time is about 5s.

My second question is: Why is the second command is faster than the first command but the execution time difference is slightly smaller than the first scenario?

Finally, I have a problem related to above scenarios: The are about 200 formulas like:

formula1 = sum(counter_1)+sum(counter_5)+sum(counter_10)+sum(counter_15)+sum(cou
nter_20)+sum(counter_25)+sum(counter_30)+sum(counter_35 )+sum(counter_40)+sum(counter_45)

formula2 = avg(counter_2)+avg(counter_5)+avg(counter_11)+avg(counter_15)+avg(cou
nter_21)+avg(counter_25)+avg(counter_31)+avg(counter_35 )+avg(counter_41)+avg(counter_45)

I have to run the following format frequently:

select formulaX,formulaY, ..., formulaZ from table where time > value1 and time < value2 and location in (value1, value 2...) group by location

My third question is: Is there anyway to optimize the performance (the query used once should be faster if it is used again in the future)? Does spark optimize itself or do I have to write some code, change config?

Upvotes: 3

Views: 2681

Answers (2)

Joe Widen
Joe Widen

Reputation: 2448

When doing an aggregate spark creates what are called shuffle files. If you run the same query twice, it will reuse the shuffle files which are stored locally on the workers fs. Unfortunately you can't rely on them to always be there because eventually the file handler gets gc'd. If your going to run 10 queries on the same dataset, cache it or use databricks.

Upvotes: 1

Jixin Jia
Jixin Jia

Reputation: 31

It's called Exchange Reuse. When Spark runs shuffling (i.e. aggregation, join) it stores a copy of the shuffle data on local worker nodes for potential reuse. This is an internally controlled behavior and cannot be directly influenced by end user. If you find you're keep re-using a particular portion of data (or query outcome), you could consider explicitly CACHING it by using the cache(). However, bear in mind although this allows Spark to reuse cached result for potentially faster query performance (if, and only if the Analyzer Plan of your cached query matches your new query), over using CACHE can cause whole lot of different performance problems.

A bad example is when your dataset is very large, it may cause Disk Spill problem. That is, the dataset doesn't fit into your cluster's available memory and needs to be written to slower hard disks.

Another bad example is when your query only needs to access a subset of the cached data. By caching the entire dataset in memory, Spark is forced to perform full in-memory table scan. Not only that's waste of resource but also results in a slower query performance as oppose to not using cache at all.

The best thing to do is try & error with a few of your own example queries, look at the Spark UI and check if there is sign of Disk Spill or large amount of input data scan.

Every query/data combination is unique hence you'll need to experiment it a bit to find the best performance tuning method for your own workload.

Upvotes: 2

Related Questions