Reputation: 54
I have 18
Spark executors that have each 102GB
memory and 26
cores, so around 1836GB
total memory and 468
total cores available for Spark, as you can see bellow :
I run the following App, that use all the resources :
As I don't do any cache/persist/broadcast, I have set the Spark Environnement as follow :
So basically :
--conf "spark.memory.fraction=0.6"
--conf "spark.memory.storageFraction=0.1"
The following Spark UI Executors tab is a bit misleading, but theorically, I have :
0.6 * 102GB = 61 GB, but let say 53 GB
for total storage and execution memory per executor18 * 53 GB = 970 GB
0.9 * 53 GB = 47 GB
for Spark execution memory, and 0.1 * 53 GB = 5 GB
for Spark storage memory.18 * 47 GB = 846 GB
As you can see on the previous screen :
Regarding the stages, the most important one is the last and active stage (that actively do Shuffle read, and produce output) :
Indeed, my app consist of a main large .join()
between two dataset, that trigger a full shuffle (534.9 GB
written at stage 11), and a large shuffle read at stage 16 (screenshot show shuffle read of 5.8 GB out of 543.9 GB)
We can zoom on such join thanks to the following Spark UI Spark SQL graph :
With some simple unix command, I can confirm that spark.local.dir
(used for temp files during shuffle) consume at most 600GB
including sum of all nodes. This number match the 534.9 GB
reported by the UI during the main shuffle write.
Question :
Why does spark spill to disk in that case ?
Indeed, Spark seems to use neither of the 846GB
execution memory, nor those 970GB
total Spark memory (including execution + storage) to store part of the Shuffle Write results, and speed up the compute.
Spark seems to spill all the data to disk. Indeed, around 600GB
is written on disk. 600GB
seems to refers to compressed data (as spark.shuffle.compress=true
).
I know that the uncompressed data might be about 3.9 TB
as the Spark UI suggest. But as it is Spark 3.0.2
that is supposed to support internally efficient Kyro serialization
, anyway, it still remain unclear why Spark spill everything in that case
Upvotes: 3
Views: 1491
Reputation: 818
perhaps your data is skewed and not well distributed between partitions?
this may cause one executor to resort to spilling
you can reparition your data, but that will just kick off a shuffle with spilling
also when counting memory there is the memory overhead of 10% (spark.executor.memoryOverhead), spark is trying to prevent OOM by not attempting to not use more than 90% of heap size (sometimes this doesn't succeeed and you have to increase value)
note that other files may also go into spark.local.dir, not everything there are necessarily shuffle spill files
some other optimisation hints:
spark.local.dir = if you have >1 physical drive, you can specify multiple dirs to spread IO
you mention kryo so I assume you are using RDDs, you can possibly improve RDD perfomance with a later jdk version (>=17 has compressed strings) and possibly use more executors with less memory (pointers are 64-bit on a 102gb heap, and are 32-bit when heap <30gb). But there again, these may perform worse with extra executor communication.
Upvotes: 1
Reputation: 18003
From an AWS guide, https://aws.amazon.com/blogs/big-data/introducing-the-cloud-shuffle-storage-plugin-for-apache-spark/#:~:text=In%20Apache%20Spark%2C%20shuffling%20happens,which%20can%20cause%20straggling%20executors, but also to be found elsewhere.
Apache Spark utilizes in-memory caching and optimized query execution for fast analytic queries against your datasets, which are split into multiple Spark partitions on different nodes so that you can process a large amount of data in parallel.
In Apache Spark, shuffling happens when data needs to be redistributed across the cluster. During a shuffle, data is written to local disk and transferred across the network. The shuffle operation is often constrained by the available local disk capacity, or data skew, which can cause straggling executors.
That is to say, the architecture of Spark is to write Mapper output to local disk, for Reducer phase, tasks to consume. Join fits into that approach as well, obviously.
Upvotes: 5
Reputation: 63
Based on your memory configuration settings, and with the given resources and configuration, Spark should be able to keep most, if not all, of the shuffle data in memory.
If Spark is still spilling data to disk, it may be due to other factors such as the size of the shuffle blocks, or the complexity of the data.
Also, did you monitor your hosts (or VMs) memory usage during the job?.
I am suspecting the data. We had such cases in a Spark cluster where Spark had some weird behavior during some heavy shuffle related jobs and after having passed months to investigate, we finally identified the culprit: Data! It contained some non ASCII and non-printable values.
Upvotes: 1